Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java Fri 
Mar  4 18:17:39 2016
@@ -20,6 +20,7 @@ package org.apache.pig.tez;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -30,6 +31,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -38,6 +40,8 @@ import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezJobCompiler;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -52,7 +56,7 @@ import org.junit.Test;
 public class TestTezAutoParallelism {
     private static final String INPUT_FILE1 = 
TestTezAutoParallelism.class.getName() + "_1";
     private static final String INPUT_FILE2 = 
TestTezAutoParallelism.class.getName() + "_2";
-    private static final String INPUT_DIR = "build/test/data";
+    private static final String INPUT_DIR = 
Util.getTestDirectory(TestTezAutoParallelism.class);
 
     private static PigServer pigServer;
     private static Properties properties;
@@ -62,6 +66,8 @@ public class TestTezAutoParallelism {
     public static void oneTimeSetUp() throws Exception {
         cluster = 
MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_TEZ);
         properties = cluster.getProperties();
+        //Test spilling to disk as tests here have multiple splits
+        
properties.setProperty(PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD, 
"10");
         createFiles();
     }
 
@@ -83,7 +89,7 @@ public class TestTezAutoParallelism {
     }
 
     private static void createFiles() throws IOException {
-        new File(INPUT_DIR).mkdir();
+        new File(INPUT_DIR).mkdirs();
 
         PrintWriter w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + 
INPUT_FILE1));
 
@@ -223,7 +229,7 @@ public class TestTezAutoParallelism {
         // skewed join parallelism is 3 originally, increase to 5
         
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
 "true");
         
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
 "3000");
-        
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
 "80000");
+        
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
 "40000");
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as 
(name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as 
(name:chararray, gender:chararray);");
         pigServer.registerQuery("C = join A by name, B by name using 
'skewed';");
@@ -238,26 +244,158 @@ public class TestTezAutoParallelism {
                 return false;
             }
         });
+        assertEquals(files.length, 5);
+    }
+
+    @Test
+    public void testSkewedFullJoinIncreaseParallelism() throws IOException{
+        // skewed full join parallelism take the initial setting, since the 
join vertex has a broadcast(sample) dependency,
+        // which prevent it changing parallelism
+        
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
 "true");
+        
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
 "3000");
+        
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
 "40000");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as 
(name:chararray, age:int);");
+        pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as 
(name:chararray, gender:chararray);");
+        pigServer.registerQuery("C = join A by name full, B by name using 
'skewed';");
+        pigServer.store("C", "output6");
+        FileSystem fs = cluster.getFileSystem();
+        FileStatus[] files = fs.listStatus(new Path("output5"), new 
PathFilter(){
+            @Override
+            public boolean accept(Path path) {
+                if (path.getName().startsWith("part")) {
+                    return true;
+                }
+                return false;
+            }
+        });
+        assertEquals(files.length, 5);
+    }
+
+    @Test
+    public void testSkewedJoinIncreaseParallelismWithScalar() throws 
IOException{
+        // skewed join parallelism take the initial setting, since the join 
vertex has a broadcast(scalar) dependency,
+        // which prevent it changing parallelism
+        
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
 "true");
+        
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
 "3000");
+        
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
 "40000");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as 
(name:chararray, age:int);");
+        pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as 
(name:chararray, gender:chararray);");
+        pigServer.registerQuery("C = join A by name, B by name using 
'skewed';");
+        pigServer.registerQuery("D = load 
'org.apache.pig.tez.TestTezAutoParallelism_1' as (name:chararray, age:int);");
+        pigServer.registerQuery("E = group D all;");
+        pigServer.registerQuery("F = foreach E generate COUNT(D) as count;");
+        pigServer.registerQuery("G = foreach C generate age/F.count, gender;");
+        pigServer.store("G", "output7");
+        FileSystem fs = cluster.getFileSystem();
+        FileStatus[] files = fs.listStatus(new Path("output7"), new 
PathFilter(){
+            @Override
+            public boolean accept(Path path) {
+                if (path.getName().startsWith("part")) {
+                    return true;
+                }
+                return false;
+            }
+        });
         assertEquals(files.length, 4);
     }
 
     @Test
-    public void testSkewedJoinIncreaseIntermediateParallelism() throws 
IOException{
+    public void testFlattenParallelism() throws IOException{
+        String outputDir = "/tmp/testFlattenParallelism";
+        String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray, 
age:int);"
+                + "B = load '" + INPUT_FILE2 + "' as (name:chararray, 
gender:chararray);"
+                + "C = join A by name, B by name using 'skewed' parallel 1;"
+                + "C1 = group C by A::name;"
+                + "C2 = FOREACH C1 generate group, FLATTEN(C);"
+                + "D = group C2 by group;"
+                + "E = foreach D generate group, COUNT(C2.A::name);"
+                + "STORE E into '" + outputDir + "/finalout';";
+        String log = testAutoParallelism(script, outputDir, true, 
TezJobCompiler.class, TezDagBuilder.class);
+        assertTrue(log.contains("For vertex - scope-74: parallelism=10"));
+        assertTrue(log.contains("For vertex - scope-75: parallelism=70"));
+        assertTrue(log.contains("Total estimated parallelism is 89"));
+    }
+
+    @Test
+    public void testIncreaseIntermediateParallelism1() throws IOException{
+        // User specified parallelism is overriden for intermediate step
+        String outputDir = "/tmp/testIncreaseIntermediateParallelism";
+        String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray, 
age:int);"
+                + "B = load '" + INPUT_FILE2 + "' as (name:chararray, 
gender:chararray);"
+                + "C = join A by name, B by name using 'skewed' parallel 1;"
+                + "D = group C by A::name;"
+                + "E = foreach D generate group, COUNT(C.A::name);"
+                + "STORE E into '" + outputDir + "/finalout';";
+        String log = testIncreaseIntermediateParallelism(script, outputDir, 
true);
+        // Parallelism of C should be increased
+        assertTrue(log.contains("Increased requested parallelism of scope-59 
to 4"));
+        assertEquals(1, StringUtils.countMatches(log, "Increased requested 
parallelism"));
+        assertTrue(log.contains("Total estimated parallelism is 40"));
+    }
+
+    @Test
+    public void testIncreaseIntermediateParallelism2() throws IOException{
+        // User specified parallelism should not be overriden for intermediate 
step if there is a STORE
+        String outputDir = "/tmp/testIncreaseIntermediateParallelism";
+        String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray, 
age:int);"
+                + "B = load '" + INPUT_FILE2 + "' as (name:chararray, 
gender:chararray);"
+                + "C = join A by name, B by name using 'skewed' parallel 2;"
+                + "STORE C into '/tmp/testIncreaseIntermediateParallelism';"
+                + "D = group C by A::name parallel 2;"
+                + "E = foreach D generate group, COUNT(C.A::name);"
+                + "STORE E into '" + outputDir + "/finalout';";
+        String log = testIncreaseIntermediateParallelism(script, outputDir, 
true);
+        // Parallelism of C will not be increased as the Split has a STORE
+        assertEquals(0, StringUtils.countMatches(log, "Increased requested 
parallelism"));
+    }
+
+    @Test
+    public void testIncreaseIntermediateParallelism3() throws IOException{
+        // Multiple levels with default parallelism. Group by followed by 
Group by
+        try {
+            String outputDir = "/tmp/testIncreaseIntermediateParallelism";
+            String script = "set default_parallel 1\n"
+                    + "A = load '" + INPUT_FILE1 + "' as (name:chararray, 
age:int);"
+                    + "B = load '" + INPUT_FILE2 + "' as (name:chararray, 
gender:chararray);"
+                    + "C = join A by name, B by name;"
+                    + "STORE C into 
'/tmp/testIncreaseIntermediateParallelism';"
+                    + "C1 = group C by A::name;"
+                    + "C2 = FOREACH C1 generate group, FLATTEN(C);"
+                    + "D = group C2 by group;"
+                    + "E = foreach D generate group, COUNT(C2.A::name);"
+                    + "F = order E by $0;"
+                    + "STORE F into '" + outputDir + "/finalout';";
+            String log = testIncreaseIntermediateParallelism(script, 
outputDir, false);
+            // Parallelism of C1 should be increased. C2 will not be increased 
due to order by
+            assertEquals(1, StringUtils.countMatches(log, "Increased requested 
parallelism"));
+            assertTrue(log.contains("Increased requested parallelism of 
scope-65 to 10"));
+            assertTrue(log.contains("Total estimated parallelism is 19"));
+        } finally {
+            pigServer.setDefaultParallel(-1);
+        }
+    }
+
+    private String testIncreaseIntermediateParallelism(String script, String 
outputDir, boolean sortAndCheck) throws IOException {
+        return testAutoParallelism(script, outputDir, sortAndCheck, 
ParallelismSetter.class, TezJobCompiler.class);
+    }
+
+    private String testAutoParallelism(String script, String outputDir, 
boolean sortAndCheck, Class... classesToLog) throws IOException {
         NodeIdGenerator.reset();
         PigServer.resetScope();
         StringWriter writer = new StringWriter();
         // When there is a combiner operation involved user specified 
parallelism is overriden
-        Util.createLogAppender(ParallelismSetter.class, 
"testSkewedJoinIncreaseIntermediateParallelism", writer);
+        Util.createLogAppender("testAutoParallelism", writer, classesToLog);
         try {
             
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
 "true");
             
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
 "4000");
             
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
 "80000");
-            pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as 
(name:chararray, age:int);");
-            pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as 
(name:chararray, gender:chararray);");
-            pigServer.registerQuery("C = join A by name, B by name using 
'skewed' parallel 1;");
-            pigServer.registerQuery("D = group C by A::name;");
-            pigServer.registerQuery("E = foreach D generate group, 
COUNT(C.A::name);");
-            Iterator<Tuple> iter = pigServer.openIterator("E");
+            pigServer.setBatchOn();
+            pigServer.registerScript(new 
ByteArrayInputStream(script.getBytes()));
+            pigServer.executeBatch();
+
+            pigServer.registerQuery("A = load '" + outputDir + "/finalout' as 
(name:chararray, count:long);");
+            Iterator<Tuple> iter = pigServer.openIterator("A");
+
             List<Tuple> expectedResults = Util
                     .getTuplesFromConstantTupleStrings(new String[] {
                             "('Abigail',56L)", "('Alexander',45L)", 
"('Ava',60L)",
@@ -267,11 +405,15 @@ public class TestTezAutoParallelism {
                             "('Liam',46L)", "('Madison',46L)", "('Mason',54L)",
                             "('Mia',51L)", "('Michael',47L)", "('Noah',38L)",
                             "('Olivia',50L)", "('Sophia',52L)", 
"('William',43L)" });
-
-            Util.checkQueryOutputsAfterSort(iter, expectedResults);
-            assertTrue(writer.toString().contains("Increased requested 
parallelism of scope-40 to 4"));
+            if (sortAndCheck) {
+                Util.checkQueryOutputsAfterSort(iter, expectedResults);
+            } else {
+                Util.checkQueryOutputs(iter, expectedResults);
+            }
+            return writer.toString();
         } finally {
-            Util.removeLogAppender(ParallelismSetter.class, 
"testSkewedJoinIncreaseIntermediateParallelism");
+            Util.removeLogAppender("testAutoParallelism", classesToLog);
+            Util.deleteFile(cluster, outputDir);
         }
     }
 }

Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java Fri Mar  4 
18:17:39 2016
@@ -33,8 +33,11 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.TezLocalExecType;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerPrinter;
+import org.apache.pig.builtin.OrcStorage;
+import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.test.TestMultiQueryBasic.DummyStoreWithOutputFormat;
 import org.apache.pig.test.Util;
 import org.apache.pig.test.utils.TestHelper;
 import org.junit.AfterClass;
@@ -86,6 +89,52 @@ public class TestTezCompiler {
     }
 
     @Test
+    public void testStoreLoad() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:int);" +
+                "store a into 'file:///tmp/output';" +
+                "b = load 'file:///tmp/output' as (x:int, y:int);" +
+                "store b into 'file:///tmp/output1';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-1.gld");
+    }
+
+    @Test
+    public void testStoreLoadMultiple() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input';" +
+                "store a into 'file:///tmp/output/Dir1';" +
+                "a = load 'file:///tmp/output/Dir1';" +
+                "store a into 'file:///tmp/output/Dir2' using BinStorage();" +
+                "a = load 'file:///tmp/output/Dir1';" +
+                "store a into 'file:///tmp/output/Dir3';" +
+                "a = load 'file:///tmp/output/Dir2' using BinStorage();" +
+                "store a into 'file:///tmp/output/Dir4';" +
+                "a = load 'file:///tmp/output/Dir3';" +
+                "b = load 'file:///tmp/output/Dir2' using BinStorage();" +
+                "c = load 'file:///tmp/output/Dir1';" +
+                "d = cogroup a by $0, b by $0, c by $0;" +
+                "store d into 'file:///tmp/output/Dir5';";
+
+        // To get around difference in ordering of operators in plan due to 
JDK7 and JDK8
+        if 
(System.getProperties().getProperty("java.version").startsWith("1.8")) {
+            run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld");
+        } else {
+            run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2-JDK7.gld");
+        }
+    }
+
+    @Test
+    public void testNative() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:int);" +
+                "b = native 'hadoop-examples.jar' Store a into 
'/tmp/table_testNativeMRJobSimple_input' Load 
'/tmp/table_testNativeMRJobSimple_output' `wordcount 
/tmp/table_testNativeMRJobSimple_input 
/tmp/table_testNativeMRJobSimple_output`;" +
+                "store b into 'file:///tmp/output';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Native-1.gld");
+    }
+
+    @Test
     public void testFilter() throws Exception {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
@@ -120,6 +169,121 @@ public class TestTezCompiler {
     }
 
     @Test
+    public void testSelfJoin() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = filter a by x < 5;" +
+                "c = filter a by x == 10;" +
+                "d = filter a by x > 10;" +
+                "e = join b by x, c by x, d by x;" +
+                "store e into 'file:///tmp/output';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-1.gld");
+    }
+
+    @Test
+    public void testSelfJoinSkewed() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = filter a by x < 5;" +
+                "c = filter a by x == 10;" +
+                "d = join b by x, c by x using 'skewed';" +
+                "store d into 'file:///tmp/output';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld");
+    }
+
+    @Test
+    public void testSelfJoinReplicated() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = filter a by x < 5;" +
+                "c = filter a by x == 10;" +
+                "d = filter a by x > 10;" +
+                "e = join b by x, c by x, d by x using 'replicated';" +
+                "store e into 'file:///tmp/output';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-3.gld");
+    }
+
+    @Test
+    public void testSelfJoinUnionReplicated() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+                "c = union a, b;" +
+                "d = join b by x, c by x using 'replicated';" +
+                "store d into 'file:///tmp/output';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld");
+    }
+
+    @Test
+    public void testSelfJoinUnion() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "a1 = filter a by x > 5;" +
+                "a2 = filter a by x < 2;" +
+                "b = union a1, a2;" +
+                "c = join b by x, a by x;" +
+                "store c into 'file:///tmp/output';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld");
+    }
+
+    @Test
+    public void testSelfJoinUnionDifferentMembers() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "a1 = filter a by x > 5;" +
+                "a2 = filter a by x < 2;" +
+                "a3 = filter a by y == 10;" +
+                "a4 = join a2 by x, a3 by x;" +
+                "a5 = foreach a4 generate a2::x as x, a3::y as y;" +
+                "b = union a1, a5;" +
+                "c = join b by x, a by x;" +
+                "store c into 'file:///tmp/output';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld");
+    }
+
+    @Test
+    public void testCross() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+                "c = cross a, b;" +
+                "d = foreach c generate a::x as x, y, z;" +
+                "store d into 'file:///tmp/output';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-1.gld");
+    }
+
+    @Test
+    public void testSelfCross() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = filter a by x < 5;" +
+                "c = filter a by x == 10;" +
+                "d = cross b, c;" +
+                "store d into 'file:///tmp/output';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-2.gld");
+    }
+
+    @Test
+    public void testCrossScalarSplit() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+                "c = cross b, a;" +
+                "d = foreach c generate a.x, a.y, z;" + //Scalar
+                "store d into 'file:///tmp/output';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld");
+    }
+
+    @Test
     public void testSkewedJoin() throws Exception {
         String query =
                 "a = load 'file:///tmp/input1' as (x:int, y:int);" +
@@ -132,6 +296,19 @@ public class TestTezCompiler {
     }
 
     @Test
+    public void testSkewedJoinFilter() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "a = filter a by x == 1;" +
+                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+                "c = join a by x, b by x using 'skewed';" +
+                "d = foreach c generate a::x as x, y, z;" +
+                "store d into 'file:///tmp/output';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld");
+    }
+
+    @Test
     public void testLimit() throws Exception {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
@@ -250,6 +427,29 @@ public class TestTezCompiler {
         run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld");
     }
 
+    @Test
+    public void testOrderByWithFilter() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, 
y:int);" +
+                "b = filter a by x == 1;" +
+                "c = order b by x;" +
+                "STORE c INTO 'file:///tmp/output';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld");
+    }
+
+    @Test
+    public void testOrderByReadOnceLoadFunc() throws Exception {
+        
setProperty("pig.sort.readonce.loadfuncs","org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage");
+        String query =
+                "a = load 'file:///tmp/input' using 
org.apache.pig.backend.hadoop.hbase.HBaseStorage(',') as (x:int, y:int);" +
+                "b = order a by x;" +
+                "STORE b INTO 'file:///tmp/output';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-3.gld");
+        setProperty("pig.sort.readonce.loadfuncs", null);
+    }
+
     // PIG-3759, PIG-3781
     // Combiner should not be added in case of co-group
     @Test
@@ -370,6 +570,59 @@ public class TestTezCompiler {
     }
 
     @Test
+    public void testMultiQueryScalar() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:int);" +
+                "b = group a by x;" +
+                "c = foreach b generate group, COUNT(a) as cnt;" +
+                "SPLIT a into d if (2 * c.cnt) < y, e OTHERWISE;" +
+                "store d into 'file:///tmp/output1';" +
+                "store e into 'file:///tmp/output2';";
+
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-6.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-6-OPTOFF.gld");
+    }
+
+    @Test
+    public void testMultiQueryMultipleReplicateJoin() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input' as (x:int, y:int);" +
+                "c = join a by $0, b by $0 using 'replicated';" +
+                "d = join a by $1, b by $1 using 'replicated';" +
+                "e = union c,d;" +
+                "store e into 'file:///tmp/output';";
+
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-7.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-7-OPTOFF.gld");
+    }
+
+    @Test
+    public void testMultiQueryMultipleScalar() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = filter a by x == 5;" +
+                "b = foreach b generate $0 as b1;" +
+                "c = filter a by x == 10;" +
+                "c = foreach c generate $0 as c1;" +
+                "d = group a by x;" +
+                "e = foreach d generate group, b.b1, c.c1;" +
+                "store e into 'file:///tmp/output';";
+
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8-OPTOFF.gld");
+    }
+
+    @Test
     public void testUnionStore() throws Exception {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
@@ -379,11 +632,41 @@ public class TestTezCompiler {
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld");
+        resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
     }
 
     @Test
+    public void testUnionUnSupportedStore() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+                "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+                "c = union onschema a, b;" +
+                "store c into 'file:///tmp/output';";
+
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+        String oldSupported = 
getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS);
+        String oldUnSupported = 
getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, 
PigStorage.class.getName());
+        // Plan should not have union optimization applied
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, 
null);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, 
OrcStorage.class.getName());
+        query =
+                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+                "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+                "c = union onschema a, b;" +
+                "store c into 'file:///tmp/output' using " + 
DummyStoreWithOutputFormat.class.getName() + "();";
+        // Plan should not have union optimization applied
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld");
+        // Restore the value
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, 
oldSupported);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, 
oldUnSupported);
+    }
+
+    @Test
     public void testUnionGroupBy() throws Exception {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
@@ -449,6 +732,8 @@ public class TestTezCompiler {
 
     @Test
     public void testUnionSkewedJoin() throws Exception {
+        // TODO: PIG-4574 optimization needs to be done for this as well.
+        // Requires changes in UnionOptimizer
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
@@ -465,6 +750,8 @@ public class TestTezCompiler {
 
     @Test
     public void testUnionOrderby() throws Exception {
+        // TODO: PIG-4574 optimization needs to be done for this as well.
+        // Requires changes in UnionOptimizer
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
@@ -500,7 +787,7 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
                 "split a into a1 if x > 100, a2 otherwise;" +
-                "c = union onschema a1, b;" +
+                "c = union onschema a1, a2, b;" +
                 "split c into d if x > 500, e otherwise;" +
                 "store a2 into 'file:///tmp/output/a2';" +
                 "store d into 'file:///tmp/output/d';" +
@@ -508,6 +795,7 @@ public class TestTezCompiler {
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld");
+        resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld");
     }
@@ -521,17 +809,18 @@ public class TestTezCompiler {
                 "d = load 'file:///tmp/input1' as (x:int, y:chararray);" +
                 "e = union onschema c, d;" +
                 "f = group e by x;" +
-                "store f into 'file:///tmp/output';";
+                "store e into 'file:///tmp/output1';" +
+                "store f into 'file:///tmp/output2';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld");
         resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10-OPTOFF.gld");
+
     }
 
-    //TODO: union followed by union followed by store does not work.
-    //@Test
+    @Test
     public void testUnionUnionStore() throws Exception {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
@@ -549,6 +838,127 @@ public class TestTezCompiler {
     }
 
     @Test
+    public void testMultipleUnionSplitJoin() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+                "b = filter a by x == 2;" +
+                "b1 = foreach b generate *;" +
+                "b2 = foreach b generate *;" +
+                "b3 = union onschema b1, b2;" +
+                "c = filter a by x == 3;" +
+                "c1 = foreach c generate y, x;" +
+                "c2 = foreach c generate y, x;" +
+                "c3 = union c1, c2;" +
+                "a1 = union onschema b3, c3;" +
+                "store a1 into 'file:///tmp/output1';" +
+                "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
+                "e = join a1 by x, d by x using 'skewed';" +
+                "store e into 'file:///tmp/output2';";
+
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-12.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-12-OPTOFF.gld");
+    }
+
+    @Test
+    public void testUnionSplitReplicateJoin() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+                "b = filter a by x == 2;" +
+                "c = union onschema a, b;" +
+                "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
+                "e = join c by x, d by x using 'replicated';" +
+                "store e into 'file:///tmp/output';";
+
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-13.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-13-OPTOFF.gld");
+
+        query =
+                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+                "b = filter a by x == 2;" +
+                "c = union onschema a, b;" +
+                "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
+                "e = join d by x, c by x using 'replicated';" +
+                "store e into 'file:///tmp/output';";
+
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-14.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-14-OPTOFF.gld");
+    }
+
+    @Test
+    public void testUnionSplitSkewedJoin() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+                "b = filter a by x == 2;" +
+                "c = union onschema a, b;" +
+                "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
+                "e = join c by x, d by x using 'skewed';" +
+                "store e into 'file:///tmp/output';";
+
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld");
+
+        query =
+                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+                "b = filter a by x == 2;" +
+                "c = union onschema a, b;" +
+                "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
+                "e = join d by x, c by x using 'skewed';" +
+                "store e into 'file:///tmp/output';";
+
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld");
+    }
+
+    @Test
+    public void testUnionScalar() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+                "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+                "c = union onschema a, b;" +
+                "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
+                "e = filter c by x == d.x;" +
+                "store e into 'file:///tmp/output';";
+
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-17.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-17-OPTOFF.gld");
+
+        query =
+                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+                "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+                "c = union onschema a, b;" +
+                "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
+                "e = filter d by x == c.x;" +
+                "store e into 'file:///tmp/output';";
+
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-18.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-18-OPTOFF.gld");
+    }
+
+    @Test
     public void testRank() throws Exception {
         String query =
                 "a = load 'file:///tmp/input1' as (x:int, y:int);" +
@@ -569,8 +979,16 @@ public class TestTezCompiler {
         run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld");
     }
 
+    private String getProperty(String property) {
+        return pigServer.getPigContext().getProperties().getProperty(property);
+    }
+
     private void setProperty(String property, String value) {
-        pigServer.getPigContext().getProperties().setProperty(property, value);
+        if (value == null) {
+            pigServer.getPigContext().getProperties().remove(property);
+        } else {
+            pigServer.getPigContext().getProperties().setProperty(property, 
value);
+        }
     }
 
     private void run(String query, String expectedFile) throws Exception {

Modified: 
pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java 
Fri Mar  4 18:17:39 2016
@@ -238,8 +238,9 @@ public class TestTezJobControlCompiler {
                 + "store e into 'output';";
         Pair<TezOperPlan, DAG> compiledPlan = compile(query);
         TezOperator leafOper = compiledPlan.first.getLeaves().get(0);
+        assertTrue(leafOper.isUseGraceParallelism());
         Vertex leafVertex = 
compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
-        assertEquals(leafVertex.getParallelism(), 70);
+        assertEquals(leafVertex.getParallelism(), -1);
     }
 
     @Test
@@ -271,8 +272,9 @@ public class TestTezJobControlCompiler {
         List<TezOperator> leaves = compiledPlan.first.getLeaves();
         Collections.sort(leaves);
         TezOperator leafOper = leaves.get(1);
+        assertTrue(leafOper.isUseGraceParallelism());
         Vertex leafVertex = 
compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
-        assertEquals(leafVertex.getParallelism(), 7);
+        assertEquals(leafVertex.getParallelism(), -1);
     }
 
     @Test

Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java Fri Mar  4 
18:17:39 2016
@@ -20,10 +20,17 @@ package org.apache.pig.tez;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.PigServer;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.test.MiniGenericCluster;
 import org.apache.pig.test.Util;
@@ -54,8 +61,8 @@ public class TestTezLauncher {
     };
 
     private static final String OUTPUT_FILE = "TestTezLauncherOutput";
-    private static final String[] OUTPUT_RECORDS = {
-        "all\t{(apple),(pear),(pear),(strawberry),(orange)}"
+    private static final String[] OUTPUT_RECORDS = new String[] {
+        "(apple)", "(pear)", "(pear)", "(strawberry)", "(orange)"
     };
 
     @BeforeClass
@@ -94,16 +101,34 @@ public class TestTezLauncher {
         PigStats pigStats = launcher.launchPig(pp, "testRun1", pc);
         assertTrue(pigStats.isSuccessful());
 
-        String[] output = Util.readOutput(cluster.getFileSystem(), 
OUTPUT_FILE);
-        for (int i = 0; i < output.length; i++) {
-            assertEquals(OUTPUT_RECORDS[i], output[i]);
-        }
-
         assertEquals(1, pigStats.getInputStats().size());
         assertEquals(INPUT_FILE, pigStats.getInputStats().get(0).getName());
 
         assertEquals(1, pigStats.getOutputStats().size());
         assertEquals(OUTPUT_FILE, pigStats.getOutputStats().get(0).getName());
+
+        query = "m = load '" + OUTPUT_FILE + "' as (a:chararray, 
b:{(y:chararray)});";
+        pigServer = new PigServer(pc);
+        pigServer.registerQuery(query);
+        Iterator<Tuple> iter = pigServer.openIterator("m");
+        Tuple result = iter.next();
+        assertEquals(result.get(0).toString(), "all");
+        Iterator<Tuple> innerIter = ((DataBag)result.get(1)).iterator();
+        int count = 0;
+        while (innerIter.hasNext()) {
+            
assertTrue(Arrays.asList(OUTPUT_RECORDS).contains(innerIter.next().toString()));
+            count++;
+        }
+        assertEquals(count, OUTPUT_RECORDS.length);
+    }
+
+    @Test
+    public void testQueueName() throws Exception {
+        Configuration conf = new Configuration();
+        conf.set("tez.queue.name", "special");
+        conf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
+        assertEquals(conf.get("tez.queue.name"), "special");
+        
     }
 }
 

Modified: pig/branches/spark/test/perf/pigmix/bin/runpigmix.pl
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/perf/pigmix/bin/runpigmix.pl?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/perf/pigmix/bin/runpigmix.pl (original)
+++ pig/branches/spark/test/perf/pigmix/bin/runpigmix.pl Fri Mar  4 18:17:39 
2016
@@ -1,8 +1,8 @@
 #!/usr/local/bin/perl -w
 
-if(scalar(@ARGV) < 6 )
+if(scalar(@ARGV) < 6)
 {
-    print STDERR "Usage: $0 <pig_home> <pig_bin> <pigmix_jar> <hadoop_home> 
<hadoop_bin> <pig mix scripts dir> <hdfs_root> <pigmix_output> [parallel] 
[numruns] [runmapreduce] \n";
+    print STDERR "Usage: $0 <pig_home> <pig_bin> <pigmix_jar> <hadoop_home> 
<hadoop_bin> <pig mix scripts dir> [hdfs_root] [pigmix_output] [parallel] 
[numruns] [runmapreduce] [cleanup_after_test]\n";
     exit(-1);
 }
 my $pighome = shift;
@@ -16,7 +16,14 @@ my $pigmixoutput = shift;
 my $parallel = shift;
 my $runs = shift;
 my $runmapreduce = shift;
+my $cleanup_after_test = shift;
 my $pigjar = "$pighome/pig-withouthadoop.jar";
+if(!defined($hdfsroot)) {
+    $hdfsroot = '/user/pig/tests/data/pigmix';
+}
+if(!defined($pigmixoutput)) {
+    $pigmixoutput = 'output';
+}
 if(!defined($parallel)) {
     $parallel = 40;
 }
@@ -26,6 +33,9 @@ if(!defined($runs)) {
 if(!defined($runmapreduce)) {
     $runmapreduce = 1;
 }
+if(!defined($cleanup_after_test)) {
+    $cleanup_after_test = 0;
+}
 
 $ENV{'HADOOP_HOME'} = $hadoophome;
 $ENV{'HADOOP_CLIENT_OPTS'}="-Xmx1024m";
@@ -110,5 +120,9 @@ sub cleanup {
     print STDERR `$cmd 2>&1`;
     $cmd = "$pigbin -e rmf tmp";
     print STDERR `$cmd 2>&1`;
+    if ($cleanup_after_test) {
+        $cmd = "$hadoopbin fs -rmr $pigmixoutput";
+        print STDERR `$cmd 2>&1`;
+    }
 }
 



Reply via email to