Modified: pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java Tue Jan 
27 02:27:45 2015
@@ -39,6 +39,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.newplan.Operator;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -53,27 +54,27 @@ public class TestCubeOperator {
 
     @BeforeClass
     public static void oneTimeSetUp() throws Exception {
-        pigServer = new PigServer("local");
+        pigServer = new PigServer(Util.getLocalTestMode());
     }
 
     @Before
     public void setUp() throws Exception {
 
-       data = resetData(pigServer);
-       data.set("input", tuple("dog", "miami", 12), tuple("cat", "miami", 18),
-               tuple("turtle", "tampa", 4), tuple("dog", "tampa", 14), 
tuple("cat", "naples", 9),
-               tuple("dog", "naples", 5), tuple("turtle", "naples", 1));
-
-       data.set("input1", tuple("u1,men,green,mango"), 
tuple("u2,men,red,mango"),
-               tuple("u3,men,green,apple"), tuple("u4,women,red,mango"),
-               tuple("u6,women,green,mango"), tuple("u7,men,red,apple"),
-               tuple("u8,men,green,mango"), tuple("u9,women,red,apple"),
-               tuple("u10,women,green,apple"), tuple("u11,men,red,apple"),
-               tuple("u12,women,green,mango"));
+        data = resetData(pigServer);
+        data.set("input", tuple("dog", "miami", 12), tuple("cat", "miami", 18),
+                tuple("turtle", "tampa", 4), tuple("dog", "tampa", 14), 
tuple("cat", "naples", 9),
+                tuple("dog", "naples", 5), tuple("turtle", "naples", 1));
+
+        data.set("input1", tuple("u1,men,green,mango"), 
tuple("u2,men,red,mango"),
+                tuple("u3,men,green,apple"), tuple("u4,women,red,mango"),
+                tuple("u6,women,green,mango"), tuple("u7,men,red,apple"),
+                tuple("u8,men,green,mango"), tuple("u9,women,red,apple"),
+                tuple("u10,women,green,apple"), tuple("u11,men,red,apple"),
+                tuple("u12,women,green,mango"));
 
-       data.set("input2", tuple("dog", "miami", "white", "pet", 5));
+        data.set("input2", tuple("dog", "miami", "white", "pet", 5));
 
-       data.set("input3", tuple("dog", "miami", 12), tuple(null, "miami", 18));
+        data.set("input3", tuple("dog", "miami", 12), tuple(null, "miami", 
18));
 
     }
 
@@ -83,533 +84,637 @@ public class TestCubeOperator {
 
     @Test
     public void testCubeBasic() throws IOException {
-       // basic correctness test
-       String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
-               + "b = cube a by cube(x,y);"
-               + "c = foreach b generate flatten(group) as (type,location), 
COUNT_STAR(cube) as count, SUM(cube.z) as total;\n"
-               + "store c into 'output' using mock.Storage();";
-       Util.registerMultiLineQuery(pigServer, query);
-
-       Set<Tuple> expected = ImmutableSet.of(
-               tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 
18)),
-               tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, 
(long) 9)),
-               tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 
27)),
-               tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 
12)),
-               tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 
14)),
-               tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, 
(long) 5)),
-               tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 
31)),
-               tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
-               tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
-               tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 
5)),
-               tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 
30)),
-               tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 
18)),
-               tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 
15)),
-               tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 
63)));
-
-       List<Tuple> out = data.get("output");
-       for (Tuple tup : out) {
-           assertTrue(expected + " contains " + tup, expected.contains(tup));
-       }
+        // basic correctness test
+        String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
+                    + "b = cube a by cube(x,y);"
+                + "c = foreach b generate flatten(group) as (type,location), 
COUNT_STAR(cube) as count, SUM(cube.z) as total;\n"
+                + "store c into 'output' using mock.Storage();";
+        Util.registerMultiLineQuery(pigServer, query);
+
+        Set<Tuple> expected = ImmutableSet.of(
+                tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, 
(long) 18)),
+                tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, 
(long) 9)),
+                tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 
27)),
+                tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, 
(long) 12)),
+                tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, 
(long) 14)),
+                tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 
31)),
+                tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
+                tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
+                tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 
30)),
+                tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 
18)),
+                tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, 
(long) 15)),
+                tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 
63)));
+
+        List<Tuple> out = data.get("output");
+        for (Tuple tup : out) {
+            assertTrue(expected + " contains " + tup, expected.contains(tup));
+        }
 
     }
 
     @Test
     public void testRollupBasic() throws IOException {
-       // basic correctness test
-       String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
-               + "b = cube a by rollup(x,y);"
-               + "c = foreach b generate flatten(group) as (type,location), 
COUNT_STAR(cube) as count, SUM(cube.z) as total;"
-               + "store c into 'output' using mock.Storage();";
-       Util.registerMultiLineQuery(pigServer, query);
-
-       Set<Tuple> expected = ImmutableSet.of(
-               tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 
18)),
-               tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, 
(long) 9)),
-               tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 
27)),
-               tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 
12)),
-               tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 
14)),
-               tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, 
(long) 5)),
-               tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 
31)),
-               tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
-               tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
-               tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 
5)),
-               tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 
63)));
-
-       List<Tuple> out = data.get("output");
-       for (Tuple tup : out) {
-           assertTrue(expected + " contains " + tup, expected.contains(tup));
-       }
+        // basic correctness test
+        String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
+                + "b = cube a by rollup(x,y);"
+                + "c = foreach b generate flatten(group) as (type,location), 
COUNT_STAR(cube) as count, SUM(cube.z) as total;"
+                + "store c into 'output' using mock.Storage();";
+        Util.registerMultiLineQuery(pigServer, query);
+
+        Set<Tuple> expected = ImmutableSet.of(
+                tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, 
(long) 18)),
+                tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, 
(long) 9)),
+                tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 
27)),
+                tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, 
(long) 12)),
+                tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, 
(long) 14)),
+                tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 
31)),
+                tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
+                tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
+                tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 
63)));
+
+        List<Tuple> out = data.get("output");
+        for (Tuple tup : out) {
+            assertTrue(expected + " contains " + tup, expected.contains(tup));
+        }
+    }
+
+    @Test
+    public void testRollupHIIBasic() throws IOException {
+        // basic correctness test
+        String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
+                + "b = cube a by rollup(x,y) pivot 1;"
+                + "c = foreach b generate flatten(group) as (type,location), 
COUNT_STAR(cube) as count, SUM(cube.z) as total;"
+                + "store c into 'output' using mock.Storage();";
+        Util.registerMultiLineQuery(pigServer, query);
+
+        Set<Tuple> expected = ImmutableSet.of(
+                tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, 
(long) 18)),
+                tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, 
(long) 9)),
+                tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 
27)),
+                tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, 
(long) 12)),
+                tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, 
(long) 14)),
+                tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 
31)),
+                tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
+                tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
+                tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 
63)));
+
+        List<Tuple> out = data.get("output");
+        for (Tuple tup : out) {
+            assertTrue(expected + " contains " + tup, expected.contains(tup));
+        }
     }
 
     @Test
     public void testCubeAndRollup() throws IOException {
-       // basic correctness test
-       String query = "a = load 'input2' USING mock.Storage() as 
(v:chararray,w:chararray,x:chararray,y:chararray,z:long);"
-               + "b = cube a by cube(v,w), rollup(x,y);"
-               + "c = foreach b generate flatten(group) as 
(type,location,color,category), COUNT_STAR(cube) as count, SUM(cube.z) as 
total;"
-               + "store c into 'output' using mock.Storage();";
-       Util.registerMultiLineQuery(pigServer, query);
-
-       Set<Tuple> expected = ImmutableSet
-               .of(tf.newTuple(Lists.newArrayList("dog", "miami", "white", 
"pet", (long) 1,
-                       (long) 5)), tf.newTuple(Lists.newArrayList("dog", null, 
"white", "pet",
-                       (long) 1, (long) 5)), 
tf.newTuple(Lists.newArrayList(null, "miami",
-                       "white", "pet", (long) 1, (long) 5)), 
tf.newTuple(Lists.newArrayList(null,
-                       null, "white", "pet", (long) 1, (long) 5)), 
tf.newTuple(Lists.newArrayList(
-                       "dog", "miami", "white", null, (long) 1, (long) 5)), 
tf.newTuple(Lists
-                       .newArrayList("dog", null, "white", null, (long) 1, 
(long) 5)), tf
-                       .newTuple(Lists.newArrayList(null, "miami", "white", 
null, (long) 1,
-                               (long) 5)), 
tf.newTuple(Lists.newArrayList(null, null, "white",
-                       null, (long) 1, (long) 5)), 
tf.newTuple(Lists.newArrayList("dog", "miami",
-                       null, null, (long) 1, (long) 5)), 
tf.newTuple(Lists.newArrayList("dog",
-                       null, null, null, (long) 1, (long) 5)), 
tf.newTuple(Lists.newArrayList(
-                       null, "miami", null, null, (long) 1, (long) 5)), 
tf.newTuple(Lists
-                       .newArrayList(null, null, null, null, (long) 1, (long) 
5)));
-
-       List<Tuple> out = data.get("output");
-       for (Tuple tup : out) {
-           assertTrue(expected + " contains " + tup, expected.contains(tup));
-       }
+        // basic correctness test
+        String query = "a = load 'input2' USING mock.Storage() as 
(v:chararray,w:chararray,x:chararray,y:chararray,z:long);"
+                + "b = cube a by cube(v,w), rollup(x,y);"
+                + "c = foreach b generate flatten(group) as 
(type,location,color,category), COUNT_STAR(cube) as count, SUM(cube.z) as 
total;"
+                + "store c into 'output' using mock.Storage();";
+        Util.registerMultiLineQuery(pigServer, query);
+
+        Set<Tuple> expected = ImmutableSet
+                .of(tf.newTuple(Lists.newArrayList("dog", "miami", "white", 
"pet", (long) 1,
+                        (long) 5)), tf.newTuple(Lists.newArrayList("dog", 
null, "white", "pet",
+                        (long) 1, (long) 5)), 
tf.newTuple(Lists.newArrayList(null, "miami",
+                        "white", "pet", (long) 1, (long) 5)), 
tf.newTuple(Lists.newArrayList(null,
+                        null, "white", "pet", (long) 1, (long) 5)), 
tf.newTuple(Lists.newArrayList(
+                        "dog", "miami", "white", null, (long) 1, (long) 5)), 
tf.newTuple(Lists
+                        .newArrayList("dog", null, "white", null, (long) 1, 
(long) 5)), tf
+                        .newTuple(Lists.newArrayList(null, "miami", "white", 
null, (long) 1,
+                                (long) 5)), 
tf.newTuple(Lists.newArrayList(null, null, "white",
+                        null, (long) 1, (long) 5)), 
tf.newTuple(Lists.newArrayList("dog", "miami",
+                        null, null, (long) 1, (long) 5)), 
tf.newTuple(Lists.newArrayList("dog",
+                        null, null, null, (long) 1, (long) 5)), 
tf.newTuple(Lists.newArrayList(
+                        null, "miami", null, null, (long) 1, (long) 5)), 
tf.newTuple(Lists
+                        .newArrayList(null, null, null, null, (long) 1, (long) 
5)));
+
+        List<Tuple> out = data.get("output");
+        for (Tuple tup : out) {
+            assertTrue(expected + " contains " + tup, expected.contains(tup));
+        }
+
+    }
+
+    @Test
+    public void testCubeAndRollupHII() throws IOException {
+        // basic correctness test
+        String query = "a = load 'input2' USING mock.Storage() as 
(v:chararray,w:chararray,x:chararray,y:chararray,z:long);"
+                + "b = cube a by cube(v,w), rollup(x,y) pivot 1;"
+                + "c = foreach b generate flatten(group) as 
(type,location,color,category), COUNT_STAR(cube) as count, SUM(cube.z) as 
total;"
+                + "store c into 'output' using mock.Storage();";
+        Util.registerMultiLineQuery(pigServer, query);
+
+        Set<Tuple> expected = ImmutableSet
+                .of(tf.newTuple(Lists.newArrayList("dog", "miami", "white", 
"pet", (long) 1,
+                        (long) 5)), tf.newTuple(Lists.newArrayList("dog", 
null, "white", "pet",
+                        (long) 1, (long) 5)), 
tf.newTuple(Lists.newArrayList(null, "miami",
+                        "white", "pet", (long) 1, (long) 5)), 
tf.newTuple(Lists.newArrayList(null,
+                        null, "white", "pet", (long) 1, (long) 5)), 
tf.newTuple(Lists.newArrayList(
+                        "dog", "miami", "white", null, (long) 1, (long) 5)), 
tf.newTuple(Lists
+                        .newArrayList("dog", null, "white", null, (long) 1, 
(long) 5)), tf
+                        .newTuple(Lists.newArrayList(null, "miami", "white", 
null, (long) 1,
+                                (long) 5)), 
tf.newTuple(Lists.newArrayList(null, null, "white",
+                        null, (long) 1, (long) 5)), 
tf.newTuple(Lists.newArrayList("dog", "miami",
+                        null, null, (long) 1, (long) 5)), 
tf.newTuple(Lists.newArrayList("dog",
+                        null, null, null, (long) 1, (long) 5)), 
tf.newTuple(Lists.newArrayList(
+                        null, "miami", null, null, (long) 1, (long) 5)), 
tf.newTuple(Lists
+                        .newArrayList(null, null, null, null, (long) 1, (long) 
5)));
+
+        List<Tuple> out = data.get("output");
+        for (Tuple tup : out) {
+            assertTrue(expected + " contains " + tup, expected.contains(tup));
+        }
 
     }
 
     @Test
     public void testCubeMultipleIAliases() throws IOException {
-       // test for input alias to cube being assigned multiple times
-       String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
-               + "a = load 'input' USING mock.Storage() as 
(x,y:chararray,z:long);"
-               + "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
-               + "b = cube a by cube(x,y);"
-               + "c = foreach b generate flatten(group) as (type,location), 
COUNT_STAR(cube) as count, SUM(cube.z) as total;"
-               + "store c into 'output' using mock.Storage();";
-
-       Util.registerMultiLineQuery(pigServer, query);
-
-       Set<Tuple> expected = ImmutableSet.of(
-               tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 
18)),
-               tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, 
(long) 9)),
-               tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 
27)),
-               tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 
12)),
-               tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 
14)),
-               tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, 
(long) 5)),
-               tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 
31)),
-               tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
-               tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
-               tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 
5)),
-               tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 
30)),
-               tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 
18)),
-               tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 
15)),
-               tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 
63)));
-
-       List<Tuple> out = data.get("output");
-       for (Tuple tup : out) {
-           assertTrue(expected + " contains " + tup, expected.contains(tup));
-       }
+        // test for input alias to cube being assigned multiple times
+        String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
+                + "a = load 'input' USING mock.Storage() as 
(x,y:chararray,z:long);"
+                + "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
+                + "b = cube a by cube(x,y);"
+                + "c = foreach b generate flatten(group) as (type,location), 
COUNT_STAR(cube) as count, SUM(cube.z) as total;"
+                + "store c into 'output' using mock.Storage();";
+
+        Util.registerMultiLineQuery(pigServer, query);
+
+        Set<Tuple> expected = ImmutableSet.of(
+                tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, 
(long) 18)),
+                tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, 
(long) 9)),
+                tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 
27)),
+                tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, 
(long) 12)),
+                tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, 
(long) 14)),
+                tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 
31)),
+                tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
+                tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
+                tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 
30)),
+                tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 
18)),
+                tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, 
(long) 15)),
+                tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 
63)));
+
+        List<Tuple> out = data.get("output");
+        for (Tuple tup : out) {
+            assertTrue(expected + " contains " + tup, expected.contains(tup));
+        }
 
     }
 
     @Test
     public void testCubeAfterForeach() throws IOException {
-       // test for foreach projection before cube operator
-       String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
-               + "b = foreach a generate x as type,y as location,z as number;"
-               + "c = cube b by cube(type,location);"
-               + "d = foreach c generate flatten(group) as (type,location), 
COUNT_STAR(cube) as count, SUM(cube.number) as total;"
-               + "store d into 'output' using mock.Storage();";
-
-       Util.registerMultiLineQuery(pigServer, query);
-
-       Set<Tuple> expected = ImmutableSet.of(
-               tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 
18)),
-               tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, 
(long) 9)),
-               tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 
27)),
-               tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 
12)),
-               tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 
14)),
-               tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, 
(long) 5)),
-               tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 
31)),
-               tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
-               tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
-               tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 
5)),
-               tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 
30)),
-               tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 
18)),
-               tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 
15)),
-               tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 
63)));
-
-       List<Tuple> out = data.get("output");
-       for (Tuple tup : out) {
-           assertTrue(expected + " contains " + tup, expected.contains(tup));
-       }
+        // test for foreach projection before cube operator
+        String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
+                + "b = foreach a generate x as type,y as location,z as number;"
+                + "c = cube b by cube(type,location);"
+                + "d = foreach c generate flatten(group) as (type,location), 
COUNT_STAR(cube) as count, SUM(cube.number) as total;"
+                + "store d into 'output' using mock.Storage();";
+
+        Util.registerMultiLineQuery(pigServer, query);
+
+        Set<Tuple> expected = ImmutableSet.of(
+                tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, 
(long) 18)),
+                tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, 
(long) 9)),
+                tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 
27)),
+                tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, 
(long) 12)),
+                tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, 
(long) 14)),
+                tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 
31)),
+                tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
+                tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
+                tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 
30)),
+                tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 
18)),
+                tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, 
(long) 15)),
+                tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 
63)));
+
+        List<Tuple> out = data.get("output");
+        for (Tuple tup : out) {
+            assertTrue(expected + " contains " + tup, expected.contains(tup));
+        }
 
     }
 
     @Test
     public void testCubeAfterLimit() throws IOException {
-       // test for limit operator before cube operator
-       String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
-               + "b = limit a 2;" + "c = cube b by cube(x,y);"
-               + "d = foreach c generate flatten(group) as (x,y), SUM(cube.z) 
as total;"
-               + "store d into 'output' using mock.Storage();";
-
-       Util.registerMultiLineQuery(pigServer, query);
-
-       Set<Tuple> expected = ImmutableSet.of(
-               tf.newTuple(Lists.newArrayList("cat", "miami", (long) 18)),
-               tf.newTuple(Lists.newArrayList("cat", null, (long) 18)),
-               tf.newTuple(Lists.newArrayList("dog", "miami", (long) 12)),
-               tf.newTuple(Lists.newArrayList("dog", null, (long) 12)),
-               tf.newTuple(Lists.newArrayList(null, "miami", (long) 30)),
-               tf.newTuple(Lists.newArrayList(null, null, (long) 30)));
-
-       List<Tuple> out = data.get("output");
-       for (Tuple tup : out) {
-           assertTrue(expected + " contains " + tup, expected.contains(tup));
-       }
+        // test for limit operator before cube operator
+        String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
+                + "b = limit a 2;" + "c = cube b by cube(x,y);"
+                + "d = foreach c generate flatten(group) as (x,y), SUM(cube.z) 
as total;"
+                + "store d into 'output' using mock.Storage();";
+
+        Util.registerMultiLineQuery(pigServer, query);
+
+        Set<Tuple> expected = ImmutableSet.of(
+                tf.newTuple(Lists.newArrayList("cat", "miami", (long) 18)),
+                tf.newTuple(Lists.newArrayList("cat", null, (long) 18)),
+                tf.newTuple(Lists.newArrayList("dog", "miami", (long) 12)),
+                tf.newTuple(Lists.newArrayList("dog", null, (long) 12)),
+                tf.newTuple(Lists.newArrayList(null, "miami", (long) 30)),
+                tf.newTuple(Lists.newArrayList(null, null, (long) 30)));
+
+        List<Tuple> out = data.get("output");
+        for (Tuple tup : out) {
+            assertTrue(expected + " contains " + tup, expected.contains(tup));
+        }
 
     }
 
     @Test
     public void testCubeWithStar() throws IOException {
-       // test for * (all) dimensions in cube operator
-       String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray);"
-               + "b = foreach a generate x as type,y as location;"
-               + "c = cube b by cube(*);"
-               + "d = foreach c generate flatten(group) as (type,location), 
COUNT_STAR(cube) as count;"
-               + "store d into 'output' using mock.Storage();";
-
-       Util.registerMultiLineQuery(pigServer, query);
-
-       Set<Tuple> expected = ImmutableSet.of(
-               tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1)),
-               tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1)),
-               tf.newTuple(Lists.newArrayList("cat", null, (long) 2)),
-               tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1)),
-               tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1)),
-               tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1)),
-               tf.newTuple(Lists.newArrayList("dog", null, (long) 3)),
-               tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1)),
-               tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1)),
-               tf.newTuple(Lists.newArrayList("turtle", null, (long) 2)),
-               tf.newTuple(Lists.newArrayList(null, "miami", (long) 2)),
-               tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2)),
-               tf.newTuple(Lists.newArrayList(null, "naples", (long) 3)),
-               tf.newTuple(Lists.newArrayList(null, null, (long) 7)));
-
-       List<Tuple> out = data.get("output");
-       for (Tuple tup : out) {
-           assertTrue(expected + " contains " + tup, expected.contains(tup));
-       }
+        // test for * (all) dimensions in cube operator
+        String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray);"
+                + "b = foreach a generate x as type,y as location;"
+                + "c = cube b by cube(*);"
+                + "d = foreach c generate flatten(group) as (type,location), 
COUNT_STAR(cube) as count;"
+                + "store d into 'output' using mock.Storage();";
+
+        Util.registerMultiLineQuery(pigServer, query);
+
+        Set<Tuple> expected = ImmutableSet.of(
+                tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1)),
+                tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1)),
+                tf.newTuple(Lists.newArrayList("cat", null, (long) 2)),
+                tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1)),
+                tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1)),
+                tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1)),
+                tf.newTuple(Lists.newArrayList("dog", null, (long) 3)),
+                tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1)),
+                tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1)),
+                tf.newTuple(Lists.newArrayList("turtle", null, (long) 2)),
+                tf.newTuple(Lists.newArrayList(null, "miami", (long) 2)),
+                tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2)),
+                tf.newTuple(Lists.newArrayList(null, "naples", (long) 3)),
+                tf.newTuple(Lists.newArrayList(null, null, (long) 7)));
+
+        List<Tuple> out = data.get("output");
+        for (Tuple tup : out) {
+            assertTrue(expected + " contains " + tup, expected.contains(tup));
+        }
 
     }
 
     @Test
     public void testCubeWithRange() throws IOException {
-       // test for range projection of dimensions in cube operator
-       String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
-               + "b = foreach a generate x as type,y as location, z as number;"
-               + "c = cube b by cube($0..$1);"
-               + "d = foreach c generate flatten(group) as (type,location), 
COUNT_STAR(cube) as count, SUM(cube.number) as total;"
-               + "store d into 'output' using mock.Storage();";
-
-       Util.registerMultiLineQuery(pigServer, query);
-
-       Set<Tuple> expected = ImmutableSet.of(
-               tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 
18)),
-               tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, 
(long) 9)),
-               tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 
27)),
-               tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 
12)),
-               tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 
14)),
-               tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, 
(long) 5)),
-               tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 
31)),
-               tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
-               tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
-               tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 
5)),
-               tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 
30)),
-               tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 
18)),
-               tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 
15)),
-               tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 
63)));
-
-       List<Tuple> out = data.get("output");
-       for (Tuple tup : out) {
-           assertTrue(expected + " contains " + tup, expected.contains(tup));
-       }
+        // test for range projection of dimensions in cube operator
+        String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
+                + "b = foreach a generate x as type,y as location, z as 
number;"
+                + "c = cube b by cube($0..$1);"
+                + "d = foreach c generate flatten(group) as (type,location), 
COUNT_STAR(cube) as count, SUM(cube.number) as total;"
+                + "store d into 'output' using mock.Storage();";
+
+        Util.registerMultiLineQuery(pigServer, query);
+
+        Set<Tuple> expected = ImmutableSet.of(
+                tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, 
(long) 18)),
+                tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, 
(long) 9)),
+                tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 
27)),
+                tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, 
(long) 12)),
+                tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, 
(long) 14)),
+                tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 
31)),
+                tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
+                tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
+                tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 
30)),
+                tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 
18)),
+                tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, 
(long) 15)),
+                tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 
63)));
+
+        List<Tuple> out = data.get("output");
+        for (Tuple tup : out) {
+            assertTrue(expected + " contains " + tup, expected.contains(tup));
+        }
 
     }
 
     @Test
     public void testCubeDuplicateDimensions() throws IOException {
-       // test for cube operator with duplicate dimensions
-       String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
-               + "b = foreach a generate x as type,y as location, z as number;"
-               + "c = cube b by cube($0..$1,$0..$1);"
-               + "d = foreach c generate flatten(group), COUNT_STAR(cube) as 
count, SUM(cube.number) as total;"
-               + "store d into 'output' using mock.Storage();";
-
-       try {
-           Util.registerMultiLineQuery(pigServer, query);
-           pigServer.openIterator("d");
-       } catch (FrontendException e) {
-           // FEException with 'duplicate dimensions detected' message is throw
-           return;
-       }
+        // test for cube operator with duplicate dimensions
+        String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
+                + "b = foreach a generate x as type,y as location, z as 
number;"
+                + "c = cube b by cube($0..$1,$0..$1);"
+                + "d = foreach c generate flatten(group), COUNT_STAR(cube) as 
count, SUM(cube.number) as total;"
+                + "store d into 'output' using mock.Storage();";
+
+        try {
+            Util.registerMultiLineQuery(pigServer, query);
+            pigServer.openIterator("d");
+        } catch (FrontendException e) {
+            // FEException with 'duplicate dimensions detected' message is 
throw
+            return;
+        }
 
-       Assert.fail("Expected to throw an exception when duplicate dimensions 
are detected!");
+        Assert.fail("Expected to throw an exception when duplicate dimensions 
are detected!");
 
     }
 
     @Test
     public void testCubeAfterFilter() throws IOException {
-       // test for filtering before cube operator
-       String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
-               + "b = filter a by x == 'dog';"
-               + "c = cube b by cube(x,y);"
-               + "d = foreach c generate flatten(group), COUNT_STAR(cube) as 
count, SUM(cube.z) as total;"
-               + "store d into 'output' using mock.Storage();";
-
-       Util.registerMultiLineQuery(pigServer, query);
-       // Iterator<Tuple> it = pigServer.openIterator("d");
-
-       Set<Tuple> expected = ImmutableSet.of(
-               tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 
12)),
-               tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 
14)),
-               tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, 
(long) 5)),
-               tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 
31)),
-               tf.newTuple(Lists.newArrayList(null, "miami", (long) 1, (long) 
12)),
-               tf.newTuple(Lists.newArrayList(null, "tampa", (long) 1, (long) 
14)),
-               tf.newTuple(Lists.newArrayList(null, "naples", (long) 1, (long) 
5)),
-               tf.newTuple(Lists.newArrayList(null, null, (long) 3, (long) 
31)));
-
-       List<Tuple> out = data.get("output");
-       for (Tuple tup : out) {
-           assertTrue(expected + " contains " + tup, expected.contains(tup));
-       }
+        // test for filtering before cube operator
+        String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
+                + "b = filter a by x == 'dog';"
+                + "c = cube b by cube(x,y);"
+                + "d = foreach c generate flatten(group), COUNT_STAR(cube) as 
count, SUM(cube.z) as total;"
+                + "store d into 'output' using mock.Storage();";
+
+        Util.registerMultiLineQuery(pigServer, query);
+        // Iterator<Tuple> it = pigServer.openIterator("d");
+
+        Set<Tuple> expected = ImmutableSet.of(
+                tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, 
(long) 12)),
+                tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, 
(long) 14)),
+                tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 
31)),
+                tf.newTuple(Lists.newArrayList(null, "miami", (long) 1, (long) 
12)),
+                tf.newTuple(Lists.newArrayList(null, "tampa", (long) 1, (long) 
14)),
+                tf.newTuple(Lists.newArrayList(null, "naples", (long) 1, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList(null, null, (long) 3, (long) 
31)));
+
+        List<Tuple> out = data.get("output");
+        for (Tuple tup : out) {
+            assertTrue(expected + " contains " + tup, expected.contains(tup));
+        }
 
     }
 
     @Test
     public void testCubeAfterOrder() throws IOException {
-       // test for ordering before cube operator
-       String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
-               + "b = order a by $2;"
-               + "c = cube b by cube(x,y);"
-               + "d = foreach c generate flatten(group), COUNT_STAR(cube) as 
count, SUM(cube.z) as total;"
-               + "store d into 'output' using mock.Storage();";
-
-       Util.registerMultiLineQuery(pigServer, query);
-
-       Set<Tuple> expected = ImmutableSet.of(
-               tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 
18)),
-               tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, 
(long) 9)),
-               tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 
27)),
-               tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 
12)),
-               tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 
14)),
-               tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, 
(long) 5)),
-               tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 
31)),
-               tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
-               tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
-               tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 
5)),
-               tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 
30)),
-               tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 
18)),
-               tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 
15)),
-               tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 
63)));
-
-       List<Tuple> out = data.get("output");
-       for (Tuple tup : out) {
-           assertTrue(expected + " contains " + tup, expected.contains(tup));
-       }
+        // test for ordering before cube operator
+        String query = "a = load 'input' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
+                + "b = order a by $2;"
+                + "c = cube b by cube(x,y);"
+                + "d = foreach c generate flatten(group), COUNT_STAR(cube) as 
count, SUM(cube.z) as total;"
+                + "store d into 'output' using mock.Storage();";
+
+        Util.registerMultiLineQuery(pigServer, query);
+
+        Set<Tuple> expected = ImmutableSet.of(
+                tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, 
(long) 18)),
+                tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, 
(long) 9)),
+                tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 
27)),
+                tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, 
(long) 12)),
+                tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, 
(long) 14)),
+                tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 
31)),
+                tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
+                tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
+                tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 
30)),
+                tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 
18)),
+                tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, 
(long) 15)),
+                tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 
63)));
+
+        List<Tuple> out = data.get("output");
+        for (Tuple tup : out) {
+            assertTrue(expected + " contains " + tup, expected.contains(tup));
+        }
     }
 
     @Test
     public void testCubeAfterJoin() throws IOException {
-       // test for cubing on joined relations
-       String query = "a = load 'input1' USING mock.Storage() as 
(a1:chararray,b1,c1,d1); "
-               + "b = load 'input' USING mock.Storage() as 
(a2,b2,c2:long,d2:chararray);"
-               + "c = join a by a1, b by d2;"
-               + "d = cube c by cube($4,$5);"
-               + "e = foreach d generate flatten(group), COUNT_STAR(cube) as 
count, SUM(cube.c2) as total;"
-               + "store e into 'output' using mock.Storage();";
-
-       Util.registerMultiLineQuery(pigServer, query);
-
-       Set<Tuple> expected = ImmutableSet.of(
-               tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 
18)),
-               tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 
18)),
-               tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 
12)),
-               tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 
14)),
-               tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 
26)),
-               tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
-               tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
-               tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 
5)),
-               tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 
30)),
-               tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 
18)),
-               tf.newTuple(Lists.newArrayList(null, "naples", (long) 1, (long) 
1)),
-               tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 
49)));
-
-       List<Tuple> out = data.get("output");
-       for (Tuple tup : out) {
-           assertTrue(expected + " contains " + tup, expected.contains(tup));
-       }
+        // test for cubing on joined relations
+        String query = "a = load 'input1' USING mock.Storage() as 
(a1:chararray,b1,c1,d1); "
+                + "b = load 'input' USING mock.Storage() as 
(a2,b2,c2:long,d2:chararray);"
+                + "c = join a by a1, b by d2;"
+                + "d = cube c by cube($4,$5);"
+                + "e = foreach d generate flatten(group), COUNT_STAR(cube) as 
count, SUM(cube.c2) as total;"
+                + "store e into 'output' using mock.Storage();";
+
+        Util.registerMultiLineQuery(pigServer, query);
+
+        Set<Tuple> expected = ImmutableSet.of(
+                tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, 
(long) 18)),
+                tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 
18)),
+                tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, 
(long) 12)),
+                tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, 
(long) 14)),
+                tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 
26)),
+                tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
+                tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
+                tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 
30)),
+                tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 
18)),
+                tf.newTuple(Lists.newArrayList(null, "naples", (long) 1, 
(long) 1)),
+                tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 
49)));
+
+        List<Tuple> out = data.get("output");
+        for (Tuple tup : out) {
+            assertTrue(expected + " contains " + tup, expected.contains(tup));
+        }
     }
 
     @Test
     public void testCubeAfterCogroup() throws IOException {
-       // test for cubing on co-grouped relation
-       String query = "a = load 'input1' USING mock.Storage() as 
(a1:chararray,b1,c1,d1); "
-               + "b = load 'input' USING mock.Storage() as 
(a2,b2,c2:long,d2:chararray);"
-               + "c = cogroup a by a1, b by d2;"
-               + "d = foreach c generate flatten(a), flatten(b);"
-               + "e = cube d by cube(a2,b2);"
-               + "f = foreach e generate flatten(group), COUNT_STAR(cube) as 
count, SUM(cube.c2) as total;"
-               + "store f into 'output' using mock.Storage();";
-
-       Util.registerMultiLineQuery(pigServer, query);
-
-       Set<Tuple> expected = ImmutableSet.of(
-               tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 
18)),
-               tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 
18)),
-               tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 
12)),
-               tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 
14)),
-               tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 
26)),
-               tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
-               tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
-               tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 
5)),
-               tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 
30)),
-               tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 
18)),
-               tf.newTuple(Lists.newArrayList(null, "naples", (long) 1, (long) 
1)),
-               tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 
49)));
-
-       List<Tuple> out = data.get("output");
-       for (Tuple tup : out) {
-           assertTrue(expected + " contains " + tup, expected.contains(tup));
-       }
+        // test for cubing on co-grouped relation
+        String query = "a = load 'input1' USING mock.Storage() as 
(a1:chararray,b1,c1,d1); "
+                + "b = load 'input' USING mock.Storage() as 
(a2,b2,c2:long,d2:chararray);"
+                + "c = cogroup a by a1, b by d2;"
+                + "d = foreach c generate flatten(a), flatten(b);"
+                + "e = cube d by cube(a2,b2);"
+                + "f = foreach e generate flatten(group), COUNT_STAR(cube) as 
count, SUM(cube.c2) as total;"
+                + "store f into 'output' using mock.Storage();";
+
+        Util.registerMultiLineQuery(pigServer, query);
+
+        Set<Tuple> expected = ImmutableSet.of(
+                tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, 
(long) 18)),
+                tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 
18)),
+                tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, 
(long) 12)),
+                tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, 
(long) 14)),
+                tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 
26)),
+                tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
+                tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
+                tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 
30)),
+                tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 
18)),
+                tf.newTuple(Lists.newArrayList(null, "naples", (long) 1, 
(long) 1)),
+                tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 
49)));
+
+        List<Tuple> out = data.get("output");
+        for (Tuple tup : out) {
+            assertTrue(expected + " contains " + tup, expected.contains(tup));
+        }
     }
 
     @Test
     public void testCubeWithNULLs() throws IOException {
-       // test for dimension values with legitimate null values
-       String query = "a = load 'input3' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
-               + "b = cube a by cube(x,y);"
-               + "c = foreach b generate flatten(group) as (type,location), 
SUM(cube.z) as total;"
-               + "store c into 'output' using mock.Storage();";
-
-       Util.registerMultiLineQuery(pigServer, query);
-
-       Set<Tuple> expected = ImmutableSet.of(
-               tf.newTuple(Lists.newArrayList("dog", "miami", (long) 12)),
-               tf.newTuple(Lists.newArrayList("dog", null, (long) 12)),
-               tf.newTuple(Lists.newArrayList(null, "miami", (long) 30)),
-               tf.newTuple(Lists.newArrayList(null, null, (long) 30)),
-               tf.newTuple(Lists.newArrayList("unknown", "miami", (long) 18)),
-               tf.newTuple(Lists.newArrayList("unknown", null, (long) 18)));
-
-       List<Tuple> out = data.get("output");
-       for (Tuple tup : out) {
-           assertTrue(expected + " contains " + tup, expected.contains(tup));
-       }
+        // test for dimension values with legitimate null values
+        String query = "a = load 'input3' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
+                + "b = cube a by cube(x,y);"
+                + "c = foreach b generate flatten(group) as (type,location), 
SUM(cube.z) as total;"
+                + "store c into 'output' using mock.Storage();";
+
+        Util.registerMultiLineQuery(pigServer, query);
+
+        Set<Tuple> expected = ImmutableSet.of(
+                tf.newTuple(Lists.newArrayList("dog", "miami", (long) 12)),
+                tf.newTuple(Lists.newArrayList("dog", null, (long) 12)),
+                tf.newTuple(Lists.newArrayList(null, "miami", (long) 30)),
+                tf.newTuple(Lists.newArrayList(null, null, (long) 30)),
+                tf.newTuple(Lists.newArrayList("unknown", "miami", (long) 18)),
+                tf.newTuple(Lists.newArrayList("unknown", null, (long) 18)));
+
+        List<Tuple> out = data.get("output");
+        for (Tuple tup : out) {
+            assertTrue(expected + " contains " + tup, expected.contains(tup));
+        }
 
     }
 
     @Test
     public void testCubeWithNULLAndFilter() throws IOException {
-       // test for dimension values with legitimate null values
-       // followed by filter
-       String query = "a = load 'input3' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
-               + "b = cube a by cube(x,y);"
-               + "c = foreach b generate flatten(group) as (type,location), 
SUM(cube.z) as total;"
-               + "d = filter c by type!='unknown';"
-               + "store d into 'output' using mock.Storage();";
-
-       Util.registerMultiLineQuery(pigServer, query);
-
-       Set<Tuple> expected = ImmutableSet.of(
-               tf.newTuple(Lists.newArrayList("dog", "miami", (long) 12)),
-               tf.newTuple(Lists.newArrayList("dog", null, (long) 12)));
-
-       List<Tuple> out = data.get("output");
-       for (Tuple tup : out) {
-           assertTrue(expected + " contains " + tup, expected.contains(tup));
-       }
+        // test for dimension values with legitimate null values
+        // followed by filter
+        String query = "a = load 'input3' USING mock.Storage() as 
(x:chararray,y:chararray,z:long);"
+                + "b = cube a by cube(x,y);"
+                + "c = foreach b generate flatten(group) as (type,location), 
SUM(cube.z) as total;"
+                + "d = filter c by type!='unknown';"
+                + "store d into 'output' using mock.Storage();";
+
+        Util.registerMultiLineQuery(pigServer, query);
+
+        Set<Tuple> expected = ImmutableSet.of(
+                tf.newTuple(Lists.newArrayList("dog", "miami", (long) 12)),
+                tf.newTuple(Lists.newArrayList("dog", null, (long) 12)));
+
+        List<Tuple> out = data.get("output");
+        for (Tuple tup : out) {
+            assertTrue(expected + " contains " + tup, expected.contains(tup));
+        }
 
     }
 
     @Test
     public void testRollupAfterCogroup() throws IOException {
-       // test for cubing on co-grouped relation
-       String query = "a = load 'input1' USING mock.Storage() as 
(a1:chararray,b1,c1,d1); "
-               + "b = load 'input' USING mock.Storage() as 
(a2,b2,c2:long,d2:chararray);"
-               + "c = cogroup a by a1, b by d2;"
-               + "d = foreach c generate flatten(a), flatten(b);"
-               + "e = cube d by rollup(a2,b2);"
-               + "f = foreach e generate flatten(group), COUNT(cube) as count, 
SUM(cube.c2) as total;"
-               + "store f into 'output' using mock.Storage();";
-
-       Util.registerMultiLineQuery(pigServer, query);
-
-       Set<Tuple> expected = ImmutableSet.of(
-               tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 
18)),
-               tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 
18)),
-               tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 
12)),
-               tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 
14)),
-               tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 
26)),
-               tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
-               tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
-               tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 
5)),
-               tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 
49)));
-
-       List<Tuple> out = data.get("output");
-       for (Tuple tup : out) {
-           assertTrue(expected + " contains " + tup, expected.contains(tup));
-       }
+        // test for cubing on co-grouped relation
+        String query = "a = load 'input1' USING mock.Storage() as 
(a1:chararray,b1,c1,d1); "
+                + "b = load 'input' USING mock.Storage() as 
(a2,b2,c2:long,d2:chararray);"
+                + "c = cogroup a by a1, b by d2;"
+                + "d = foreach c generate flatten(a), flatten(b);"
+                + "e = cube d by rollup(a2,b2);"
+                + "f = foreach e generate flatten(group), COUNT(cube) as 
count, SUM(cube.c2) as total;"
+                + "store f into 'output' using mock.Storage();";
+
+        Util.registerMultiLineQuery(pigServer, query);
+
+        Set<Tuple> expected = ImmutableSet.of(
+                tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, 
(long) 18)),
+                tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 
18)),
+                tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, 
(long) 12)),
+                tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, 
(long) 14)),
+                tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 
26)),
+                tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
+                tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
+                tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 
49)));
+
+        List<Tuple> out = data.get("output");
+        for (Tuple tup : out) {
+            assertTrue(expected + " contains " + tup, expected.contains(tup));
+        }
     }
 
     @Test
-    public void testIllustrate() throws IOException {
+    public void testIllustrate() throws Exception {
        // test for illustrate
+        Assume.assumeTrue("illustrate does not work in tez (PIG-3993)", 
!Util.getLocalTestMode().toString().startsWith("TEZ"));
        String query = "a = load 'input' USING mock.Storage() as 
(a1:chararray,b1:chararray,c1:long); "
                + "b = cube a by cube(a1,b1);";
 
-       Util.registerMultiLineQuery(pigServer, query);
-       Map<Operator, DataBag> examples = pigServer.getExamples("b");
-       assertTrue(examples != null);
+        Util.registerMultiLineQuery(pigServer, query);
+        Map<Operator, DataBag> examples = pigServer.getExamples("b");
+        assertTrue(examples != null);
     }
 
     @Test
-    public void testExplainCube() throws IOException {
-       // test for explain
-       String query = "a = load 'input' USING mock.Storage() as 
(a1:chararray,b1:chararray,c1:long); "
-               + "b = cube a by cube(a1,b1);";
+    public void testRollupHIIAfterCogroup() throws IOException {
+        // test for cubing on co-grouped relation
+        String query = "a = load 'input1' USING mock.Storage() as 
(a1:chararray,b1,c1,d1); "
+                + "b = load 'input' USING mock.Storage() as 
(a2,b2,c2:long,d2:chararray);"
+                + "c = cogroup a by a1, b by d2;"
+                + "d = foreach c generate flatten(a), flatten(b);"
+                + "e = cube d by rollup(a2,b2) pivot 1;"
+                + "f = foreach e generate flatten(group), COUNT(cube) as 
count, SUM(cube.c2) as total;"
+                + "store f into 'output' using mock.Storage();";
+
+        Util.registerMultiLineQuery(pigServer, query);
+
+        Set<Tuple> expected = ImmutableSet.of(
+                tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, 
(long) 18)),
+                tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 
18)),
+                tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, 
(long) 12)),
+                tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, 
(long) 14)),
+                tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 
26)),
+                tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, 
(long) 4)),
+                tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, 
(long) 1)),
+                tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, 
(long) 5)),
+                tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 
49)));
+
+        List<Tuple> out = data.get("output");
+        for (Tuple tup : out) {
+            assertTrue(expected + " contains " + tup, expected.contains(tup));
+        }
+    }
 
-       Util.registerMultiLineQuery(pigServer, query);
-       ByteArrayOutputStream baos = new ByteArrayOutputStream();
-       PrintStream ps = new PrintStream(baos);
-       pigServer.explain("b", ps);
-       assertTrue(baos.toString().contains("CubeDimensions"));
+    @Test
+    public void testExplainCube() throws IOException {
+        // test for explain
+        String query = "a = load 'input' USING mock.Storage() as 
(a1:chararray,b1:chararray,c1:long); "
+                + "b = cube a by cube(a1,b1);";
+
+        Util.registerMultiLineQuery(pigServer, query);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(baos);
+        pigServer.explain("b", ps);
+        assertTrue(baos.toString().contains("CubeDimensions"));
     }
 
     @Test
     public void testExplainRollup() throws IOException {
-       // test for explain
-       String query = "a = load 'input' USING mock.Storage() as 
(a1:chararray,b1:chararray,c1:long); "
-               + "b = cube a by rollup(a1,b1);";
+        // test for explain
+        String query = "a = load 'input' USING mock.Storage() as 
(a1:chararray,b1:chararray,c1:long); "
+                + "b = cube a by rollup(a1,b1);";
+
+        Util.registerMultiLineQuery(pigServer, query);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(baos);
+        pigServer.explain("b", ps);
+        assertTrue(baos.toString().contains("RollupDimensions"));
+    }
 
-       Util.registerMultiLineQuery(pigServer, query);
-       ByteArrayOutputStream baos = new ByteArrayOutputStream();
-       PrintStream ps = new PrintStream(baos);
-       pigServer.explain("b", ps);
-       assertTrue(baos.toString().contains("RollupDimensions"));
+    @Test
+    public void testExplainRollupHII() throws IOException {
+        // test for explain
+        String query = "a = load 'input' USING mock.Storage() as 
(a1:chararray,b1:chararray,c1:long); "
+                + "b = cube a by rollup(a1,b1) pivot 1;";
+
+        Util.registerMultiLineQuery(pigServer, query);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(baos);
+        pigServer.explain("b", ps);
+        assertTrue(baos.toString().contains("RollupDimensions"));
     }
 
     @Test
     public void testDescribe() throws IOException {
-       // test for describe
-       String query = "a = load 'input' USING mock.Storage() as 
(a1:chararray,b1:chararray,c1:long); "
-               + "b = cube a by cube(a1,b1);";
-
-       Util.registerMultiLineQuery(pigServer, query);
-       Schema sch = pigServer.dumpSchema("b");
-       for (String alias : sch.getAliases()) {
-           if (alias.compareTo("cube") == 0) {
-               assertTrue(alias.contains("cube"));
-           }
-       }
+        // test for describe
+        String query = "a = load 'input' USING mock.Storage() as 
(a1:chararray,b1:chararray,c1:long); "
+                + "b = cube a by cube(a1,b1);";
+
+        Util.registerMultiLineQuery(pigServer, query);
+        Schema sch = pigServer.dumpSchema("b");
+        for (String alias : sch.getAliases()) {
+            if (alias.compareTo("cube") == 0) {
+            assertTrue(alias.contains("cube"));
+            }
+        }
     }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestDataBagAccess.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestDataBagAccess.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestDataBagAccess.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestDataBagAccess.java Tue Jan 
27 02:27:45 2015
@@ -29,7 +29,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.BinStorage;
@@ -52,7 +51,7 @@ public class TestDataBagAccess {
 
     @Before
     public void setUp() throws Exception{
-        pigServer = new PigServer(ExecType.LOCAL, new Properties());
+        pigServer = new PigServer(Util.getLocalTestMode(), new Properties());
     }
 
     @Test

Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java Tue 
Jan 27 02:27:45 2015
@@ -39,7 +39,6 @@ import junit.framework.Assert;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.EvalFunc;
-import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -60,6 +59,7 @@ import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.test.utils.Identity;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -73,7 +73,7 @@ public class TestEvalPipelineLocal {
     
     @Before
     public void setUp() throws Exception{
-        pigServer = new PigServer(ExecType.LOCAL);
+        pigServer = new PigServer(Util.getLocalTestMode());
     }
     
     static public class MyBagFunction extends EvalFunc<DataBag>{
@@ -1030,6 +1030,8 @@ public class TestEvalPipelineLocal {
     
     @Test
     public void testExplainInDotGraph() throws Exception{
+        Assume.assumeTrue("Skip this test for TEZ since TEZ does not support 
explain in dot format",
+                !Util.getLocalTestMode().toString().startsWith("TEZ"));
         pigServer.registerQuery("a = load 'voter' using " + 
PigStorage.class.getName() + "(',') as (name, age, registration, 
contributions);");
         pigServer.registerQuery("b = filter a by age < 50;");
         pigServer.registerQuery("c = group b by registration;");
@@ -1139,10 +1141,16 @@ public class TestEvalPipelineLocal {
         Schema expectedSchema = Utils.getSchemaFromString(
                     "group: bytearray");
         Assert.assertEquals(expectedSchema, dumpedSchema);
+        TupleFactory tf = TupleFactory.getInstance();
+        List<Tuple> expected = new ArrayList<Tuple>();
+        Tuple t = tf.newTuple(1);
+        t.set(0, new DataByteArray("NYSE".getBytes()));
+        expected.add(t);
+        t = tf.newTuple(1);
+        t.set(0, new DataByteArray("NASDAQ".getBytes()));
+        expected.add(t);
         Iterator<Tuple> iter = pigServer.openIterator("zzz");
-        Assert.assertTrue(iter.next().toString().equals("(NYSE)"));
-        Assert.assertTrue(iter.next().toString().equals("(NASDAQ)"));
-        Assert.assertFalse(iter.hasNext());
+        Util.checkQueryOutputsAfterSort(iter, expected);
     }
     
     // Self cross, see PIG-3292

Modified: pig/branches/spark/test/org/apache/pig/test/TestFetch.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFetch.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestFetch.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestFetch.java Tue Jan 27 
02:27:45 2015
@@ -33,12 +33,11 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 
-import org.apache.pig.ExecType;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher;
 import org.apache.pig.backend.hadoop.executionengine.fetch.FetchOptimizer;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecutionEngine;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
@@ -94,7 +93,7 @@ public class TestFetch {
 
     @Before
     public void setUp() throws Exception{
-        pigServer = new PigServer(ExecType.LOCAL, new Properties());
+        pigServer = new PigServer(Util.getLocalTestMode(), new Properties());
         // force direct fetch mode
         
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_OPT_FETCH,
 "true");
     }
@@ -123,7 +122,7 @@ public class TestFetch {
 
         LogicalPlan lp = ParserTestingUtils.generateLogicalPlan(query);
 
-        PhysicalPlan pp = ((MRExecutionEngine) 
pigServer.getPigContext().getExecutionEngine())
+        PhysicalPlan pp = ((HExecutionEngine) 
pigServer.getPigContext().getExecutionEngine())
                 .compile(lp, null);
 
         boolean planFetchable = 
FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp);
@@ -162,7 +161,7 @@ public class TestFetch {
 
         LogicalPlan lp = ParserTestingUtils.generateLogicalPlan(query);
 
-        PhysicalPlan pp = ((MRExecutionEngine) 
pigServer.getPigContext().getExecutionEngine())
+        PhysicalPlan pp = ((HExecutionEngine) 
pigServer.getPigContext().getExecutionEngine())
                 .compile(lp, null);
 
         boolean planFetchable = 
FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp);
@@ -228,7 +227,7 @@ public class TestFetch {
             pigServer.setBatchOn();
 
             LogicalPlan lp = TestPigStats.getLogicalPlan(pigServer);
-            PhysicalPlan pp = ((MRExecutionEngine)
+            PhysicalPlan pp = ((HExecutionEngine)
                     
pigServer.getPigContext().getExecutionEngine()).compile(lp, null);
             boolean planFetchable = 
FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp);
             assertFalse(planFetchable);
@@ -257,7 +256,7 @@ public class TestFetch {
             pigServer.setBatchOn();
 
             LogicalPlan lp = TestPigStats.getLogicalPlan(pigServer);
-            PhysicalPlan pp = ((MRExecutionEngine)
+            PhysicalPlan pp = ((HExecutionEngine)
                     
pigServer.getPigContext().getExecutionEngine()).compile(lp, null);
             boolean planFetchable = 
FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp);
             assertFalse(planFetchable);

Modified: pig/branches/spark/test/org/apache/pig/test/TestFilterOpNumeric.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFilterOpNumeric.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestFilterOpNumeric.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestFilterOpNumeric.java Tue 
Jan 27 02:27:45 2015
@@ -28,7 +28,6 @@ import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
@@ -44,7 +43,7 @@ public class TestFilterOpNumeric {
 
     @Before
     public void setUp() throws Exception {
-        pig = new PigServer(ExecType.LOCAL);
+        pig = new PigServer(Util.getLocalTestMode());
     }
 
     @Test

Modified: pig/branches/spark/test/org/apache/pig/test/TestFilterOpString.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFilterOpString.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestFilterOpString.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestFilterOpString.java Tue Jan 
27 02:27:45 2015
@@ -28,7 +28,6 @@ import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
@@ -46,7 +45,7 @@ public class TestFilterOpString {
     @Before
     public void setUp() throws Exception {
         FileLocalizer.deleteTempFiles();
-        pig = new PigServer(ExecType.LOCAL);
+        pig = new PigServer(Util.getLocalTestMode());
     }
 
     @Test

Modified: pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlan.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlan.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlan.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlan.java Tue 
Jan 27 02:27:45 2015
@@ -214,18 +214,10 @@ public class TestForEachNestedPlan {
                        "generate group, MIN(E); };");
 
             Iterator<Tuple> iter = pig.openIterator("C");
-
             List<Tuple> expectedResults =
                 Util.getTuplesFromConstantTupleStrings(
                         new String[] {"(10,68)", "(20,78)"});
-
-            int counter = 0;
-            while (iter.hasNext()) {
-               assertEquals(expectedResults.get(counter++).toString(),
-                        iter.next().toString());
-            }
-
-            assertEquals(expectedResults.size(), counter);
+            Util.checkQueryOutputsAfterSort(iter, expectedResults);
         } finally{
             new File(INPUT_FILE).delete();
             try {
@@ -263,14 +255,7 @@ public class TestForEachNestedPlan {
             List<Tuple> expectedResults =
                 Util.getTuplesFromConstantTupleStrings(
                         new String[] {"(1,3)", "(1,4)", "(2,3)", "(2,4)"});
-
-            int counter = 0;
-            while (iter.hasNext()) {
-                assertEquals(expectedResults.get(counter++).toString(),
-                        iter.next().toString());
-            }
-
-            assertEquals(expectedResults.size(), counter);
+            Util.checkQueryOutputsAfterSort(iter, expectedResults);
         } finally{
             new File(INPUT_FILE).delete();
             try {

Modified: 
pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java 
Tue Jan 27 02:27:45 2015
@@ -19,7 +19,6 @@
 package org.apache.pig.test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -30,7 +29,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.test.utils.TestHelper;
@@ -41,7 +39,7 @@ public class TestForEachNestedPlanLocal
     private PigServer pig ;
 
     public TestForEachNestedPlanLocal() throws Throwable {
-        pig = new PigServer(ExecType.LOCAL) ;
+        pig = new PigServer(Util.getLocalTestMode()) ;
     }
 
     Boolean[] nullFlags = new Boolean[]{ false, true };
@@ -79,19 +77,9 @@ public class TestForEachNestedPlanLocal
         pig.registerQuery("c = foreach b { " + "     c1 = limit $1 5; "
                 + "    generate COUNT(c1); " + "};");
         Iterator<Tuple> it = pig.openIterator("c");
-        Tuple t = null;
-        long count[] = new long[3];
-        for (int i = 0; i < 3 && it.hasNext(); i++) {
-            t = it.next();
-            count[i] = (Long)t.get(0);
-        }
-
-        assertFalse(it.hasNext());
+        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new 
String[] {"(5L)", "(5L)", "(3L)" });
 
-        // Pig's previous local mode was screwed up correcting that
-        assertEquals(5L, count[0]);
-        assertEquals(5L, count[1]);
-        assertEquals(3L, count[2]);
+        Util.checkQueryOutputsAfterSort(it, expected);
     }
 
     @Test

Modified: pig/branches/spark/test/org/apache/pig/test/TestForEachStar.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachStar.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestForEachStar.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestForEachStar.java Tue Jan 27 
02:27:45 2015
@@ -20,17 +20,12 @@ package org.apache.pig.test;
 import static org.junit.Assert.*;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.Iterator;
-import java.util.List;
 
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.parser.ParserException;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -65,8 +60,8 @@ public class TestForEachStar {
     }
 
     @Test
-    public void testForeachStarSchemaUnkown() throws IOException, 
ParserException{
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testForeachStarSchemaUnkown() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         String query =
             "  l1 = load '" + INPUT_FILE + "' ;"
             + "f1 = foreach l1 generate * ;"

Modified: pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java Tue Jan 
27 02:27:45 2015
@@ -1047,6 +1047,190 @@ public class TestHBaseStorage {
 
     /**
      * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it 
into
+     * 'TESTTABLE_2' using HBaseBinaryFormat setting the timestamp
+     *
+     * @throws IOException
+     */
+    @Test
+    public void testStoreToHBase_1_with_timestamp() throws IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+        prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
+        scanTable1(pig, DataFormat.HBaseBinary);
+        long timestamp = System.currentTimeMillis();
+        pig.registerQuery("b = FOREACH a GENERATE rowKey, " + timestamp + "l, 
col_a, col_b, col_c;");
+        pig.store("b",  "hbase://" + TESTTABLE_2,
+                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+                + TESTCOLUMN_C + "','-caster HBaseBinaryConverter 
-includeTimestamp true')");
+
+        HTable table = new HTable(conf, TESTTABLE_2);
+        ResultScanner scanner = table.getScanner(new Scan());
+        Iterator<Result> iter = scanner.iterator();
+        int i = 0;
+        for (i = 0; iter.hasNext(); ++i) {
+            Result result = iter.next();
+            String v = String.valueOf(i);
+            String rowKey = Bytes.toString(result.getRow());
+            int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
+            double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
+            String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C));
+
+            long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
+            long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
+            long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
+            
+            Assert.assertEquals(timestamp, col_a_ts);
+            Assert.assertEquals(timestamp, col_b_ts);
+            Assert.assertEquals(timestamp, col_c_ts);
+
+            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+            Assert.assertEquals(i, col_a);
+            Assert.assertEquals(i + 0.0, col_b, 1e-6);
+            Assert.assertEquals("Text_" + i, col_c);
+        }
+        Assert.assertEquals(100, i);
+        table.close();
+    }
+
+    /**
+     * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it 
into
+     * 'TESTTABLE_2' using HBaseBinaryFormat setting the timestamp
+     *
+     * @throws IOException
+     */
+    @Test
+    public void testStoreToHBase_1_with_datetime_timestamp() throws 
IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+        prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
+        scanTable1(pig, DataFormat.HBaseBinary);
+        long timestamp = System.currentTimeMillis();
+        pig.registerQuery("b = FOREACH a GENERATE rowKey, ToDate(" + timestamp 
+ "l), col_a, col_b, col_c;");
+        pig.store("b",  "hbase://" + TESTTABLE_2,
+                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+                + TESTCOLUMN_C + "','-caster HBaseBinaryConverter 
-includeTimestamp true')");
+
+        HTable table = new HTable(conf, TESTTABLE_2);
+        ResultScanner scanner = table.getScanner(new Scan());
+        Iterator<Result> iter = scanner.iterator();
+        int i = 0;
+        for (i = 0; iter.hasNext(); ++i) {
+            Result result = iter.next();
+            String v = String.valueOf(i);
+            String rowKey = Bytes.toString(result.getRow());
+            int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
+            double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
+            String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C));
+
+            long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
+            long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
+            long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
+            
+            Assert.assertEquals(timestamp, col_a_ts);
+            Assert.assertEquals(timestamp, col_b_ts);
+            Assert.assertEquals(timestamp, col_c_ts);
+
+            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+            Assert.assertEquals(i, col_a);
+            Assert.assertEquals(i + 0.0, col_b, 1e-6);
+            Assert.assertEquals("Text_" + i, col_c);
+        }
+        Assert.assertEquals(100, i);
+        table.close();
+    }
+
+    /**
+     * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it 
into
+     * 'TESTTABLE_2' using HBaseBinaryFormat setting the timestamp
+     *
+     * @throws IOException
+     */
+    @Test
+    public void testStoreToHBase_1_with_bytearray_timestamp() throws 
IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+        prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
+        scanTable1(pig, DataFormat.HBaseBinary);
+        long timestamp = System.currentTimeMillis();
+        pig.registerQuery("b = FOREACH a GENERATE rowKey, " + timestamp + "l 
as timestamp:bytearray, col_a, col_b, col_c;");
+        pig.store("b",  "hbase://" + TESTTABLE_2,
+                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+                + TESTCOLUMN_C + "','-caster HBaseBinaryConverter 
-includeTimestamp true')");
+
+        HTable table = new HTable(conf, TESTTABLE_2);
+        ResultScanner scanner = table.getScanner(new Scan());
+        Iterator<Result> iter = scanner.iterator();
+        int i = 0;
+        for (i = 0; iter.hasNext(); ++i) {
+            Result result = iter.next();
+            String v = String.valueOf(i);
+            String rowKey = Bytes.toString(result.getRow());
+            int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
+            double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
+            String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C));
+
+            long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
+            long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
+            long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
+            
+            Assert.assertEquals(timestamp, col_a_ts);
+            Assert.assertEquals(timestamp, col_b_ts);
+            Assert.assertEquals(timestamp, col_c_ts);
+
+            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+            Assert.assertEquals(i, col_a);
+            Assert.assertEquals(i + 0.0, col_b, 1e-6);
+            Assert.assertEquals("Text_" + i, col_c);
+        }
+        Assert.assertEquals(100, i);
+        table.close();
+    }
+
+    /**
+     * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it 
into
+     * 'TESTTABLE_1' deleting odd row keys
+     *
+     * @throws IOException
+     */
+    @Test
+    public void testStoreToHBase_1_with_delete() throws IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+        scanTable1(pig, DataFormat.HBaseBinary);
+        pig.registerQuery("b = FOREACH a GENERATE rowKey, 
(boolean)(((int)rowKey) % 2), col_a, col_b, col_c;");
+        pig.store("b",  "hbase://" + TESTTABLE_1,
+                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+                + TESTCOLUMN_C + "','-caster HBaseBinaryConverter 
-includeTombstone true')");
+
+        HTable table = new HTable(conf, TESTTABLE_1);
+        ResultScanner scanner = table.getScanner(new Scan());
+        Iterator<Result> iter = scanner.iterator();
+        int count = 0;
+        for (int i = 0; iter.hasNext(); i = i + 2) {
+            Result result = iter.next();
+            String v = String.valueOf(i);
+            String rowKey = Bytes.toString(result.getRow());
+            int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
+            double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
+            String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C));
+
+            long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
+            long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
+            long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
+            
+            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+            Assert.assertEquals(i, col_a);
+            Assert.assertEquals(i + 0.0, col_b, 1e-6);
+            Assert.assertEquals("Text_" + i, col_c);
+
+            count++;
+        }
+        Assert.assertEquals(50, count);
+        table.close();
+    }
+
+    /**
+     * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it 
into
      * 'TESTTABLE_2' using UTF-8 Plain Text format
      *
      * @throws IOException
@@ -1094,6 +1278,7 @@ public class TestHBaseStorage {
         Object key = "somekey";
         byte type = DataType.CHARARRAY;
         Assert.assertFalse(hbaseStorage.createPut(key, type).getWriteToWAL());
+        Assert.assertFalse(hbaseStorage.createDelete(key, type, 
System.currentTimeMillis()).getWriteToWAL());
     }
 
     /**
@@ -1108,6 +1293,7 @@ public class TestHBaseStorage {
         Object key = "somekey";
         byte type = DataType.CHARARRAY;
         Assert.assertTrue(hbaseStorage.createPut(key, type).getWriteToWAL());
+        Assert.assertTrue(hbaseStorage.createDelete(key, type, 
System.currentTimeMillis()).getWriteToWAL());
     }
 
     /**
@@ -1357,4 +1543,16 @@ public class TestHBaseStorage {
         return result.getValue(colArray[0], colArray[1]);
 
     }
+
+    /**
+     * Helper to deal with fetching a timestamp based on a cf:colname string 
spec
+     * @param result
+     * @param colName
+     * @return
+     */
+    private static long getColTimestamp(Result result, String colName) {
+        byte[][] colArray = Bytes.toByteArrays(colName.split(":"));
+        return result.getColumnLatestCell(colArray[0], 
colArray[1]).getTimestamp();
+    }
+
 }


Reply via email to