Modified: pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java Tue Jan 27 02:27:45 2015 @@ -39,6 +39,7 @@ import org.apache.pig.impl.logicalLayer. import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.newplan.Operator; import org.junit.AfterClass; +import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -53,27 +54,27 @@ public class TestCubeOperator { @BeforeClass public static void oneTimeSetUp() throws Exception { - pigServer = new PigServer("local"); + pigServer = new PigServer(Util.getLocalTestMode()); } @Before public void setUp() throws Exception { - data = resetData(pigServer); - data.set("input", tuple("dog", "miami", 12), tuple("cat", "miami", 18), - tuple("turtle", "tampa", 4), tuple("dog", "tampa", 14), tuple("cat", "naples", 9), - tuple("dog", "naples", 5), tuple("turtle", "naples", 1)); - - data.set("input1", tuple("u1,men,green,mango"), tuple("u2,men,red,mango"), - tuple("u3,men,green,apple"), tuple("u4,women,red,mango"), - tuple("u6,women,green,mango"), tuple("u7,men,red,apple"), - tuple("u8,men,green,mango"), tuple("u9,women,red,apple"), - tuple("u10,women,green,apple"), tuple("u11,men,red,apple"), - tuple("u12,women,green,mango")); + data = resetData(pigServer); + data.set("input", tuple("dog", "miami", 12), tuple("cat", "miami", 18), + tuple("turtle", "tampa", 4), tuple("dog", "tampa", 14), tuple("cat", "naples", 9), + tuple("dog", "naples", 5), tuple("turtle", "naples", 1)); + + data.set("input1", tuple("u1,men,green,mango"), tuple("u2,men,red,mango"), + tuple("u3,men,green,apple"), tuple("u4,women,red,mango"), + tuple("u6,women,green,mango"), tuple("u7,men,red,apple"), + tuple("u8,men,green,mango"), tuple("u9,women,red,apple"), + tuple("u10,women,green,apple"), tuple("u11,men,red,apple"), + tuple("u12,women,green,mango")); - data.set("input2", tuple("dog", "miami", "white", "pet", 5)); + data.set("input2", tuple("dog", "miami", "white", "pet", 5)); - data.set("input3", tuple("dog", "miami", 12), tuple(null, "miami", 18)); + data.set("input3", tuple("dog", "miami", 12), tuple(null, "miami", 18)); } @@ -83,533 +84,637 @@ public class TestCubeOperator { @Test public void testCubeBasic() throws IOException { - // basic correctness test - String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" - + "b = cube a by cube(x,y);" - + "c = foreach b generate flatten(group) as (type,location), COUNT_STAR(cube) as count, SUM(cube.z) as total;\n" - + "store c into 'output' using mock.Storage();"; - Util.registerMultiLineQuery(pigServer, query); - - Set<Tuple> expected = ImmutableSet.of( - tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), - tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), - tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), - tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), - tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), - tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), - tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), - tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), - tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), - tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), - tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), - tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), - tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 15)), - tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); - - List<Tuple> out = data.get("output"); - for (Tuple tup : out) { - assertTrue(expected + " contains " + tup, expected.contains(tup)); - } + // basic correctness test + String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" + + "b = cube a by cube(x,y);" + + "c = foreach b generate flatten(group) as (type,location), COUNT_STAR(cube) as count, SUM(cube.z) as total;\n" + + "store c into 'output' using mock.Storage();"; + Util.registerMultiLineQuery(pigServer, query); + + Set<Tuple> expected = ImmutableSet.of( + tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), + tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), + tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), + tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), + tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), + tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), + tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), + tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), + tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), + tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), + tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), + tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), + tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 15)), + tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); + + List<Tuple> out = data.get("output"); + for (Tuple tup : out) { + assertTrue(expected + " contains " + tup, expected.contains(tup)); + } } @Test public void testRollupBasic() throws IOException { - // basic correctness test - String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" - + "b = cube a by rollup(x,y);" - + "c = foreach b generate flatten(group) as (type,location), COUNT_STAR(cube) as count, SUM(cube.z) as total;" - + "store c into 'output' using mock.Storage();"; - Util.registerMultiLineQuery(pigServer, query); - - Set<Tuple> expected = ImmutableSet.of( - tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), - tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), - tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), - tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), - tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), - tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), - tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), - tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), - tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), - tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), - tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); - - List<Tuple> out = data.get("output"); - for (Tuple tup : out) { - assertTrue(expected + " contains " + tup, expected.contains(tup)); - } + // basic correctness test + String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" + + "b = cube a by rollup(x,y);" + + "c = foreach b generate flatten(group) as (type,location), COUNT_STAR(cube) as count, SUM(cube.z) as total;" + + "store c into 'output' using mock.Storage();"; + Util.registerMultiLineQuery(pigServer, query); + + Set<Tuple> expected = ImmutableSet.of( + tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), + tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), + tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), + tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), + tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), + tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), + tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), + tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), + tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), + tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), + tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); + + List<Tuple> out = data.get("output"); + for (Tuple tup : out) { + assertTrue(expected + " contains " + tup, expected.contains(tup)); + } + } + + @Test + public void testRollupHIIBasic() throws IOException { + // basic correctness test + String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" + + "b = cube a by rollup(x,y) pivot 1;" + + "c = foreach b generate flatten(group) as (type,location), COUNT_STAR(cube) as count, SUM(cube.z) as total;" + + "store c into 'output' using mock.Storage();"; + Util.registerMultiLineQuery(pigServer, query); + + Set<Tuple> expected = ImmutableSet.of( + tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), + tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), + tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), + tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), + tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), + tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), + tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), + tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), + tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), + tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), + tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); + + List<Tuple> out = data.get("output"); + for (Tuple tup : out) { + assertTrue(expected + " contains " + tup, expected.contains(tup)); + } } @Test public void testCubeAndRollup() throws IOException { - // basic correctness test - String query = "a = load 'input2' USING mock.Storage() as (v:chararray,w:chararray,x:chararray,y:chararray,z:long);" - + "b = cube a by cube(v,w), rollup(x,y);" - + "c = foreach b generate flatten(group) as (type,location,color,category), COUNT_STAR(cube) as count, SUM(cube.z) as total;" - + "store c into 'output' using mock.Storage();"; - Util.registerMultiLineQuery(pigServer, query); - - Set<Tuple> expected = ImmutableSet - .of(tf.newTuple(Lists.newArrayList("dog", "miami", "white", "pet", (long) 1, - (long) 5)), tf.newTuple(Lists.newArrayList("dog", null, "white", "pet", - (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList(null, "miami", - "white", "pet", (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList(null, - null, "white", "pet", (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList( - "dog", "miami", "white", null, (long) 1, (long) 5)), tf.newTuple(Lists - .newArrayList("dog", null, "white", null, (long) 1, (long) 5)), tf - .newTuple(Lists.newArrayList(null, "miami", "white", null, (long) 1, - (long) 5)), tf.newTuple(Lists.newArrayList(null, null, "white", - null, (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList("dog", "miami", - null, null, (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList("dog", - null, null, null, (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList( - null, "miami", null, null, (long) 1, (long) 5)), tf.newTuple(Lists - .newArrayList(null, null, null, null, (long) 1, (long) 5))); - - List<Tuple> out = data.get("output"); - for (Tuple tup : out) { - assertTrue(expected + " contains " + tup, expected.contains(tup)); - } + // basic correctness test + String query = "a = load 'input2' USING mock.Storage() as (v:chararray,w:chararray,x:chararray,y:chararray,z:long);" + + "b = cube a by cube(v,w), rollup(x,y);" + + "c = foreach b generate flatten(group) as (type,location,color,category), COUNT_STAR(cube) as count, SUM(cube.z) as total;" + + "store c into 'output' using mock.Storage();"; + Util.registerMultiLineQuery(pigServer, query); + + Set<Tuple> expected = ImmutableSet + .of(tf.newTuple(Lists.newArrayList("dog", "miami", "white", "pet", (long) 1, + (long) 5)), tf.newTuple(Lists.newArrayList("dog", null, "white", "pet", + (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList(null, "miami", + "white", "pet", (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList(null, + null, "white", "pet", (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList( + "dog", "miami", "white", null, (long) 1, (long) 5)), tf.newTuple(Lists + .newArrayList("dog", null, "white", null, (long) 1, (long) 5)), tf + .newTuple(Lists.newArrayList(null, "miami", "white", null, (long) 1, + (long) 5)), tf.newTuple(Lists.newArrayList(null, null, "white", + null, (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList("dog", "miami", + null, null, (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList("dog", + null, null, null, (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList( + null, "miami", null, null, (long) 1, (long) 5)), tf.newTuple(Lists + .newArrayList(null, null, null, null, (long) 1, (long) 5))); + + List<Tuple> out = data.get("output"); + for (Tuple tup : out) { + assertTrue(expected + " contains " + tup, expected.contains(tup)); + } + + } + + @Test + public void testCubeAndRollupHII() throws IOException { + // basic correctness test + String query = "a = load 'input2' USING mock.Storage() as (v:chararray,w:chararray,x:chararray,y:chararray,z:long);" + + "b = cube a by cube(v,w), rollup(x,y) pivot 1;" + + "c = foreach b generate flatten(group) as (type,location,color,category), COUNT_STAR(cube) as count, SUM(cube.z) as total;" + + "store c into 'output' using mock.Storage();"; + Util.registerMultiLineQuery(pigServer, query); + + Set<Tuple> expected = ImmutableSet + .of(tf.newTuple(Lists.newArrayList("dog", "miami", "white", "pet", (long) 1, + (long) 5)), tf.newTuple(Lists.newArrayList("dog", null, "white", "pet", + (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList(null, "miami", + "white", "pet", (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList(null, + null, "white", "pet", (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList( + "dog", "miami", "white", null, (long) 1, (long) 5)), tf.newTuple(Lists + .newArrayList("dog", null, "white", null, (long) 1, (long) 5)), tf + .newTuple(Lists.newArrayList(null, "miami", "white", null, (long) 1, + (long) 5)), tf.newTuple(Lists.newArrayList(null, null, "white", + null, (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList("dog", "miami", + null, null, (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList("dog", + null, null, null, (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList( + null, "miami", null, null, (long) 1, (long) 5)), tf.newTuple(Lists + .newArrayList(null, null, null, null, (long) 1, (long) 5))); + + List<Tuple> out = data.get("output"); + for (Tuple tup : out) { + assertTrue(expected + " contains " + tup, expected.contains(tup)); + } } @Test public void testCubeMultipleIAliases() throws IOException { - // test for input alias to cube being assigned multiple times - String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" - + "a = load 'input' USING mock.Storage() as (x,y:chararray,z:long);" - + "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" - + "b = cube a by cube(x,y);" - + "c = foreach b generate flatten(group) as (type,location), COUNT_STAR(cube) as count, SUM(cube.z) as total;" - + "store c into 'output' using mock.Storage();"; - - Util.registerMultiLineQuery(pigServer, query); - - Set<Tuple> expected = ImmutableSet.of( - tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), - tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), - tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), - tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), - tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), - tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), - tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), - tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), - tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), - tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), - tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), - tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), - tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 15)), - tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); - - List<Tuple> out = data.get("output"); - for (Tuple tup : out) { - assertTrue(expected + " contains " + tup, expected.contains(tup)); - } + // test for input alias to cube being assigned multiple times + String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" + + "a = load 'input' USING mock.Storage() as (x,y:chararray,z:long);" + + "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" + + "b = cube a by cube(x,y);" + + "c = foreach b generate flatten(group) as (type,location), COUNT_STAR(cube) as count, SUM(cube.z) as total;" + + "store c into 'output' using mock.Storage();"; + + Util.registerMultiLineQuery(pigServer, query); + + Set<Tuple> expected = ImmutableSet.of( + tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), + tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), + tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), + tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), + tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), + tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), + tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), + tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), + tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), + tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), + tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), + tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), + tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 15)), + tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); + + List<Tuple> out = data.get("output"); + for (Tuple tup : out) { + assertTrue(expected + " contains " + tup, expected.contains(tup)); + } } @Test public void testCubeAfterForeach() throws IOException { - // test for foreach projection before cube operator - String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" - + "b = foreach a generate x as type,y as location,z as number;" - + "c = cube b by cube(type,location);" - + "d = foreach c generate flatten(group) as (type,location), COUNT_STAR(cube) as count, SUM(cube.number) as total;" - + "store d into 'output' using mock.Storage();"; - - Util.registerMultiLineQuery(pigServer, query); - - Set<Tuple> expected = ImmutableSet.of( - tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), - tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), - tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), - tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), - tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), - tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), - tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), - tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), - tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), - tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), - tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), - tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), - tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 15)), - tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); - - List<Tuple> out = data.get("output"); - for (Tuple tup : out) { - assertTrue(expected + " contains " + tup, expected.contains(tup)); - } + // test for foreach projection before cube operator + String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" + + "b = foreach a generate x as type,y as location,z as number;" + + "c = cube b by cube(type,location);" + + "d = foreach c generate flatten(group) as (type,location), COUNT_STAR(cube) as count, SUM(cube.number) as total;" + + "store d into 'output' using mock.Storage();"; + + Util.registerMultiLineQuery(pigServer, query); + + Set<Tuple> expected = ImmutableSet.of( + tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), + tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), + tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), + tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), + tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), + tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), + tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), + tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), + tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), + tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), + tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), + tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), + tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 15)), + tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); + + List<Tuple> out = data.get("output"); + for (Tuple tup : out) { + assertTrue(expected + " contains " + tup, expected.contains(tup)); + } } @Test public void testCubeAfterLimit() throws IOException { - // test for limit operator before cube operator - String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" - + "b = limit a 2;" + "c = cube b by cube(x,y);" - + "d = foreach c generate flatten(group) as (x,y), SUM(cube.z) as total;" - + "store d into 'output' using mock.Storage();"; - - Util.registerMultiLineQuery(pigServer, query); - - Set<Tuple> expected = ImmutableSet.of( - tf.newTuple(Lists.newArrayList("cat", "miami", (long) 18)), - tf.newTuple(Lists.newArrayList("cat", null, (long) 18)), - tf.newTuple(Lists.newArrayList("dog", "miami", (long) 12)), - tf.newTuple(Lists.newArrayList("dog", null, (long) 12)), - tf.newTuple(Lists.newArrayList(null, "miami", (long) 30)), - tf.newTuple(Lists.newArrayList(null, null, (long) 30))); - - List<Tuple> out = data.get("output"); - for (Tuple tup : out) { - assertTrue(expected + " contains " + tup, expected.contains(tup)); - } + // test for limit operator before cube operator + String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" + + "b = limit a 2;" + "c = cube b by cube(x,y);" + + "d = foreach c generate flatten(group) as (x,y), SUM(cube.z) as total;" + + "store d into 'output' using mock.Storage();"; + + Util.registerMultiLineQuery(pigServer, query); + + Set<Tuple> expected = ImmutableSet.of( + tf.newTuple(Lists.newArrayList("cat", "miami", (long) 18)), + tf.newTuple(Lists.newArrayList("cat", null, (long) 18)), + tf.newTuple(Lists.newArrayList("dog", "miami", (long) 12)), + tf.newTuple(Lists.newArrayList("dog", null, (long) 12)), + tf.newTuple(Lists.newArrayList(null, "miami", (long) 30)), + tf.newTuple(Lists.newArrayList(null, null, (long) 30))); + + List<Tuple> out = data.get("output"); + for (Tuple tup : out) { + assertTrue(expected + " contains " + tup, expected.contains(tup)); + } } @Test public void testCubeWithStar() throws IOException { - // test for * (all) dimensions in cube operator - String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray);" - + "b = foreach a generate x as type,y as location;" - + "c = cube b by cube(*);" - + "d = foreach c generate flatten(group) as (type,location), COUNT_STAR(cube) as count;" - + "store d into 'output' using mock.Storage();"; - - Util.registerMultiLineQuery(pigServer, query); - - Set<Tuple> expected = ImmutableSet.of( - tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1)), - tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1)), - tf.newTuple(Lists.newArrayList("cat", null, (long) 2)), - tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1)), - tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1)), - tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1)), - tf.newTuple(Lists.newArrayList("dog", null, (long) 3)), - tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1)), - tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1)), - tf.newTuple(Lists.newArrayList("turtle", null, (long) 2)), - tf.newTuple(Lists.newArrayList(null, "miami", (long) 2)), - tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2)), - tf.newTuple(Lists.newArrayList(null, "naples", (long) 3)), - tf.newTuple(Lists.newArrayList(null, null, (long) 7))); - - List<Tuple> out = data.get("output"); - for (Tuple tup : out) { - assertTrue(expected + " contains " + tup, expected.contains(tup)); - } + // test for * (all) dimensions in cube operator + String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray);" + + "b = foreach a generate x as type,y as location;" + + "c = cube b by cube(*);" + + "d = foreach c generate flatten(group) as (type,location), COUNT_STAR(cube) as count;" + + "store d into 'output' using mock.Storage();"; + + Util.registerMultiLineQuery(pigServer, query); + + Set<Tuple> expected = ImmutableSet.of( + tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1)), + tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1)), + tf.newTuple(Lists.newArrayList("cat", null, (long) 2)), + tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1)), + tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1)), + tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1)), + tf.newTuple(Lists.newArrayList("dog", null, (long) 3)), + tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1)), + tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1)), + tf.newTuple(Lists.newArrayList("turtle", null, (long) 2)), + tf.newTuple(Lists.newArrayList(null, "miami", (long) 2)), + tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2)), + tf.newTuple(Lists.newArrayList(null, "naples", (long) 3)), + tf.newTuple(Lists.newArrayList(null, null, (long) 7))); + + List<Tuple> out = data.get("output"); + for (Tuple tup : out) { + assertTrue(expected + " contains " + tup, expected.contains(tup)); + } } @Test public void testCubeWithRange() throws IOException { - // test for range projection of dimensions in cube operator - String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" - + "b = foreach a generate x as type,y as location, z as number;" - + "c = cube b by cube($0..$1);" - + "d = foreach c generate flatten(group) as (type,location), COUNT_STAR(cube) as count, SUM(cube.number) as total;" - + "store d into 'output' using mock.Storage();"; - - Util.registerMultiLineQuery(pigServer, query); - - Set<Tuple> expected = ImmutableSet.of( - tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), - tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), - tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), - tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), - tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), - tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), - tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), - tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), - tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), - tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), - tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), - tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), - tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 15)), - tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); - - List<Tuple> out = data.get("output"); - for (Tuple tup : out) { - assertTrue(expected + " contains " + tup, expected.contains(tup)); - } + // test for range projection of dimensions in cube operator + String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" + + "b = foreach a generate x as type,y as location, z as number;" + + "c = cube b by cube($0..$1);" + + "d = foreach c generate flatten(group) as (type,location), COUNT_STAR(cube) as count, SUM(cube.number) as total;" + + "store d into 'output' using mock.Storage();"; + + Util.registerMultiLineQuery(pigServer, query); + + Set<Tuple> expected = ImmutableSet.of( + tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), + tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), + tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), + tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), + tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), + tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), + tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), + tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), + tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), + tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), + tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), + tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), + tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 15)), + tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); + + List<Tuple> out = data.get("output"); + for (Tuple tup : out) { + assertTrue(expected + " contains " + tup, expected.contains(tup)); + } } @Test public void testCubeDuplicateDimensions() throws IOException { - // test for cube operator with duplicate dimensions - String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" - + "b = foreach a generate x as type,y as location, z as number;" - + "c = cube b by cube($0..$1,$0..$1);" - + "d = foreach c generate flatten(group), COUNT_STAR(cube) as count, SUM(cube.number) as total;" - + "store d into 'output' using mock.Storage();"; - - try { - Util.registerMultiLineQuery(pigServer, query); - pigServer.openIterator("d"); - } catch (FrontendException e) { - // FEException with 'duplicate dimensions detected' message is throw - return; - } + // test for cube operator with duplicate dimensions + String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" + + "b = foreach a generate x as type,y as location, z as number;" + + "c = cube b by cube($0..$1,$0..$1);" + + "d = foreach c generate flatten(group), COUNT_STAR(cube) as count, SUM(cube.number) as total;" + + "store d into 'output' using mock.Storage();"; + + try { + Util.registerMultiLineQuery(pigServer, query); + pigServer.openIterator("d"); + } catch (FrontendException e) { + // FEException with 'duplicate dimensions detected' message is throw + return; + } - Assert.fail("Expected to throw an exception when duplicate dimensions are detected!"); + Assert.fail("Expected to throw an exception when duplicate dimensions are detected!"); } @Test public void testCubeAfterFilter() throws IOException { - // test for filtering before cube operator - String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" - + "b = filter a by x == 'dog';" - + "c = cube b by cube(x,y);" - + "d = foreach c generate flatten(group), COUNT_STAR(cube) as count, SUM(cube.z) as total;" - + "store d into 'output' using mock.Storage();"; - - Util.registerMultiLineQuery(pigServer, query); - // Iterator<Tuple> it = pigServer.openIterator("d"); - - Set<Tuple> expected = ImmutableSet.of( - tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), - tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), - tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), - tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), - tf.newTuple(Lists.newArrayList(null, "miami", (long) 1, (long) 12)), - tf.newTuple(Lists.newArrayList(null, "tampa", (long) 1, (long) 14)), - tf.newTuple(Lists.newArrayList(null, "naples", (long) 1, (long) 5)), - tf.newTuple(Lists.newArrayList(null, null, (long) 3, (long) 31))); - - List<Tuple> out = data.get("output"); - for (Tuple tup : out) { - assertTrue(expected + " contains " + tup, expected.contains(tup)); - } + // test for filtering before cube operator + String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" + + "b = filter a by x == 'dog';" + + "c = cube b by cube(x,y);" + + "d = foreach c generate flatten(group), COUNT_STAR(cube) as count, SUM(cube.z) as total;" + + "store d into 'output' using mock.Storage();"; + + Util.registerMultiLineQuery(pigServer, query); + // Iterator<Tuple> it = pigServer.openIterator("d"); + + Set<Tuple> expected = ImmutableSet.of( + tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), + tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), + tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), + tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), + tf.newTuple(Lists.newArrayList(null, "miami", (long) 1, (long) 12)), + tf.newTuple(Lists.newArrayList(null, "tampa", (long) 1, (long) 14)), + tf.newTuple(Lists.newArrayList(null, "naples", (long) 1, (long) 5)), + tf.newTuple(Lists.newArrayList(null, null, (long) 3, (long) 31))); + + List<Tuple> out = data.get("output"); + for (Tuple tup : out) { + assertTrue(expected + " contains " + tup, expected.contains(tup)); + } } @Test public void testCubeAfterOrder() throws IOException { - // test for ordering before cube operator - String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" - + "b = order a by $2;" - + "c = cube b by cube(x,y);" - + "d = foreach c generate flatten(group), COUNT_STAR(cube) as count, SUM(cube.z) as total;" - + "store d into 'output' using mock.Storage();"; - - Util.registerMultiLineQuery(pigServer, query); - - Set<Tuple> expected = ImmutableSet.of( - tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), - tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), - tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), - tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), - tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), - tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), - tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), - tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), - tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), - tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), - tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), - tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), - tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 15)), - tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); - - List<Tuple> out = data.get("output"); - for (Tuple tup : out) { - assertTrue(expected + " contains " + tup, expected.contains(tup)); - } + // test for ordering before cube operator + String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" + + "b = order a by $2;" + + "c = cube b by cube(x,y);" + + "d = foreach c generate flatten(group), COUNT_STAR(cube) as count, SUM(cube.z) as total;" + + "store d into 'output' using mock.Storage();"; + + Util.registerMultiLineQuery(pigServer, query); + + Set<Tuple> expected = ImmutableSet.of( + tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), + tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), + tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), + tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), + tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), + tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), + tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), + tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), + tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), + tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), + tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), + tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), + tf.newTuple(Lists.newArrayList(null, "naples", (long) 3, (long) 15)), + tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); + + List<Tuple> out = data.get("output"); + for (Tuple tup : out) { + assertTrue(expected + " contains " + tup, expected.contains(tup)); + } } @Test public void testCubeAfterJoin() throws IOException { - // test for cubing on joined relations - String query = "a = load 'input1' USING mock.Storage() as (a1:chararray,b1,c1,d1); " - + "b = load 'input' USING mock.Storage() as (a2,b2,c2:long,d2:chararray);" - + "c = join a by a1, b by d2;" - + "d = cube c by cube($4,$5);" - + "e = foreach d generate flatten(group), COUNT_STAR(cube) as count, SUM(cube.c2) as total;" - + "store e into 'output' using mock.Storage();"; - - Util.registerMultiLineQuery(pigServer, query); - - Set<Tuple> expected = ImmutableSet.of( - tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), - tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 18)), - tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), - tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), - tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 26)), - tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), - tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), - tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), - tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), - tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), - tf.newTuple(Lists.newArrayList(null, "naples", (long) 1, (long) 1)), - tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 49))); - - List<Tuple> out = data.get("output"); - for (Tuple tup : out) { - assertTrue(expected + " contains " + tup, expected.contains(tup)); - } + // test for cubing on joined relations + String query = "a = load 'input1' USING mock.Storage() as (a1:chararray,b1,c1,d1); " + + "b = load 'input' USING mock.Storage() as (a2,b2,c2:long,d2:chararray);" + + "c = join a by a1, b by d2;" + + "d = cube c by cube($4,$5);" + + "e = foreach d generate flatten(group), COUNT_STAR(cube) as count, SUM(cube.c2) as total;" + + "store e into 'output' using mock.Storage();"; + + Util.registerMultiLineQuery(pigServer, query); + + Set<Tuple> expected = ImmutableSet.of( + tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), + tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 18)), + tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), + tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), + tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 26)), + tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), + tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), + tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), + tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), + tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), + tf.newTuple(Lists.newArrayList(null, "naples", (long) 1, (long) 1)), + tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 49))); + + List<Tuple> out = data.get("output"); + for (Tuple tup : out) { + assertTrue(expected + " contains " + tup, expected.contains(tup)); + } } @Test public void testCubeAfterCogroup() throws IOException { - // test for cubing on co-grouped relation - String query = "a = load 'input1' USING mock.Storage() as (a1:chararray,b1,c1,d1); " - + "b = load 'input' USING mock.Storage() as (a2,b2,c2:long,d2:chararray);" - + "c = cogroup a by a1, b by d2;" - + "d = foreach c generate flatten(a), flatten(b);" - + "e = cube d by cube(a2,b2);" - + "f = foreach e generate flatten(group), COUNT_STAR(cube) as count, SUM(cube.c2) as total;" - + "store f into 'output' using mock.Storage();"; - - Util.registerMultiLineQuery(pigServer, query); - - Set<Tuple> expected = ImmutableSet.of( - tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), - tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 18)), - tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), - tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), - tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 26)), - tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), - tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), - tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), - tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), - tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), - tf.newTuple(Lists.newArrayList(null, "naples", (long) 1, (long) 1)), - tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 49))); - - List<Tuple> out = data.get("output"); - for (Tuple tup : out) { - assertTrue(expected + " contains " + tup, expected.contains(tup)); - } + // test for cubing on co-grouped relation + String query = "a = load 'input1' USING mock.Storage() as (a1:chararray,b1,c1,d1); " + + "b = load 'input' USING mock.Storage() as (a2,b2,c2:long,d2:chararray);" + + "c = cogroup a by a1, b by d2;" + + "d = foreach c generate flatten(a), flatten(b);" + + "e = cube d by cube(a2,b2);" + + "f = foreach e generate flatten(group), COUNT_STAR(cube) as count, SUM(cube.c2) as total;" + + "store f into 'output' using mock.Storage();"; + + Util.registerMultiLineQuery(pigServer, query); + + Set<Tuple> expected = ImmutableSet.of( + tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), + tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 18)), + tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), + tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), + tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 26)), + tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), + tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), + tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), + tf.newTuple(Lists.newArrayList(null, "miami", (long) 2, (long) 30)), + tf.newTuple(Lists.newArrayList(null, "tampa", (long) 2, (long) 18)), + tf.newTuple(Lists.newArrayList(null, "naples", (long) 1, (long) 1)), + tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 49))); + + List<Tuple> out = data.get("output"); + for (Tuple tup : out) { + assertTrue(expected + " contains " + tup, expected.contains(tup)); + } } @Test public void testCubeWithNULLs() throws IOException { - // test for dimension values with legitimate null values - String query = "a = load 'input3' USING mock.Storage() as (x:chararray,y:chararray,z:long);" - + "b = cube a by cube(x,y);" - + "c = foreach b generate flatten(group) as (type,location), SUM(cube.z) as total;" - + "store c into 'output' using mock.Storage();"; - - Util.registerMultiLineQuery(pigServer, query); - - Set<Tuple> expected = ImmutableSet.of( - tf.newTuple(Lists.newArrayList("dog", "miami", (long) 12)), - tf.newTuple(Lists.newArrayList("dog", null, (long) 12)), - tf.newTuple(Lists.newArrayList(null, "miami", (long) 30)), - tf.newTuple(Lists.newArrayList(null, null, (long) 30)), - tf.newTuple(Lists.newArrayList("unknown", "miami", (long) 18)), - tf.newTuple(Lists.newArrayList("unknown", null, (long) 18))); - - List<Tuple> out = data.get("output"); - for (Tuple tup : out) { - assertTrue(expected + " contains " + tup, expected.contains(tup)); - } + // test for dimension values with legitimate null values + String query = "a = load 'input3' USING mock.Storage() as (x:chararray,y:chararray,z:long);" + + "b = cube a by cube(x,y);" + + "c = foreach b generate flatten(group) as (type,location), SUM(cube.z) as total;" + + "store c into 'output' using mock.Storage();"; + + Util.registerMultiLineQuery(pigServer, query); + + Set<Tuple> expected = ImmutableSet.of( + tf.newTuple(Lists.newArrayList("dog", "miami", (long) 12)), + tf.newTuple(Lists.newArrayList("dog", null, (long) 12)), + tf.newTuple(Lists.newArrayList(null, "miami", (long) 30)), + tf.newTuple(Lists.newArrayList(null, null, (long) 30)), + tf.newTuple(Lists.newArrayList("unknown", "miami", (long) 18)), + tf.newTuple(Lists.newArrayList("unknown", null, (long) 18))); + + List<Tuple> out = data.get("output"); + for (Tuple tup : out) { + assertTrue(expected + " contains " + tup, expected.contains(tup)); + } } @Test public void testCubeWithNULLAndFilter() throws IOException { - // test for dimension values with legitimate null values - // followed by filter - String query = "a = load 'input3' USING mock.Storage() as (x:chararray,y:chararray,z:long);" - + "b = cube a by cube(x,y);" - + "c = foreach b generate flatten(group) as (type,location), SUM(cube.z) as total;" - + "d = filter c by type!='unknown';" - + "store d into 'output' using mock.Storage();"; - - Util.registerMultiLineQuery(pigServer, query); - - Set<Tuple> expected = ImmutableSet.of( - tf.newTuple(Lists.newArrayList("dog", "miami", (long) 12)), - tf.newTuple(Lists.newArrayList("dog", null, (long) 12))); - - List<Tuple> out = data.get("output"); - for (Tuple tup : out) { - assertTrue(expected + " contains " + tup, expected.contains(tup)); - } + // test for dimension values with legitimate null values + // followed by filter + String query = "a = load 'input3' USING mock.Storage() as (x:chararray,y:chararray,z:long);" + + "b = cube a by cube(x,y);" + + "c = foreach b generate flatten(group) as (type,location), SUM(cube.z) as total;" + + "d = filter c by type!='unknown';" + + "store d into 'output' using mock.Storage();"; + + Util.registerMultiLineQuery(pigServer, query); + + Set<Tuple> expected = ImmutableSet.of( + tf.newTuple(Lists.newArrayList("dog", "miami", (long) 12)), + tf.newTuple(Lists.newArrayList("dog", null, (long) 12))); + + List<Tuple> out = data.get("output"); + for (Tuple tup : out) { + assertTrue(expected + " contains " + tup, expected.contains(tup)); + } } @Test public void testRollupAfterCogroup() throws IOException { - // test for cubing on co-grouped relation - String query = "a = load 'input1' USING mock.Storage() as (a1:chararray,b1,c1,d1); " - + "b = load 'input' USING mock.Storage() as (a2,b2,c2:long,d2:chararray);" - + "c = cogroup a by a1, b by d2;" - + "d = foreach c generate flatten(a), flatten(b);" - + "e = cube d by rollup(a2,b2);" - + "f = foreach e generate flatten(group), COUNT(cube) as count, SUM(cube.c2) as total;" - + "store f into 'output' using mock.Storage();"; - - Util.registerMultiLineQuery(pigServer, query); - - Set<Tuple> expected = ImmutableSet.of( - tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), - tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 18)), - tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), - tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), - tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 26)), - tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), - tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), - tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), - tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 49))); - - List<Tuple> out = data.get("output"); - for (Tuple tup : out) { - assertTrue(expected + " contains " + tup, expected.contains(tup)); - } + // test for cubing on co-grouped relation + String query = "a = load 'input1' USING mock.Storage() as (a1:chararray,b1,c1,d1); " + + "b = load 'input' USING mock.Storage() as (a2,b2,c2:long,d2:chararray);" + + "c = cogroup a by a1, b by d2;" + + "d = foreach c generate flatten(a), flatten(b);" + + "e = cube d by rollup(a2,b2);" + + "f = foreach e generate flatten(group), COUNT(cube) as count, SUM(cube.c2) as total;" + + "store f into 'output' using mock.Storage();"; + + Util.registerMultiLineQuery(pigServer, query); + + Set<Tuple> expected = ImmutableSet.of( + tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), + tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 18)), + tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), + tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), + tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 26)), + tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), + tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), + tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), + tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 49))); + + List<Tuple> out = data.get("output"); + for (Tuple tup : out) { + assertTrue(expected + " contains " + tup, expected.contains(tup)); + } } @Test - public void testIllustrate() throws IOException { + public void testIllustrate() throws Exception { // test for illustrate + Assume.assumeTrue("illustrate does not work in tez (PIG-3993)", !Util.getLocalTestMode().toString().startsWith("TEZ")); String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long); " + "b = cube a by cube(a1,b1);"; - Util.registerMultiLineQuery(pigServer, query); - Map<Operator, DataBag> examples = pigServer.getExamples("b"); - assertTrue(examples != null); + Util.registerMultiLineQuery(pigServer, query); + Map<Operator, DataBag> examples = pigServer.getExamples("b"); + assertTrue(examples != null); } @Test - public void testExplainCube() throws IOException { - // test for explain - String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long); " - + "b = cube a by cube(a1,b1);"; + public void testRollupHIIAfterCogroup() throws IOException { + // test for cubing on co-grouped relation + String query = "a = load 'input1' USING mock.Storage() as (a1:chararray,b1,c1,d1); " + + "b = load 'input' USING mock.Storage() as (a2,b2,c2:long,d2:chararray);" + + "c = cogroup a by a1, b by d2;" + + "d = foreach c generate flatten(a), flatten(b);" + + "e = cube d by rollup(a2,b2) pivot 1;" + + "f = foreach e generate flatten(group), COUNT(cube) as count, SUM(cube.c2) as total;" + + "store f into 'output' using mock.Storage();"; + + Util.registerMultiLineQuery(pigServer, query); + + Set<Tuple> expected = ImmutableSet.of( + tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), + tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 18)), + tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), + tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), + tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 26)), + tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), + tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), + tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), + tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 49))); + + List<Tuple> out = data.get("output"); + for (Tuple tup : out) { + assertTrue(expected + " contains " + tup, expected.contains(tup)); + } + } - Util.registerMultiLineQuery(pigServer, query); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PrintStream ps = new PrintStream(baos); - pigServer.explain("b", ps); - assertTrue(baos.toString().contains("CubeDimensions")); + @Test + public void testExplainCube() throws IOException { + // test for explain + String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long); " + + "b = cube a by cube(a1,b1);"; + + Util.registerMultiLineQuery(pigServer, query); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + pigServer.explain("b", ps); + assertTrue(baos.toString().contains("CubeDimensions")); } @Test public void testExplainRollup() throws IOException { - // test for explain - String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long); " - + "b = cube a by rollup(a1,b1);"; + // test for explain + String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long); " + + "b = cube a by rollup(a1,b1);"; + + Util.registerMultiLineQuery(pigServer, query); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + pigServer.explain("b", ps); + assertTrue(baos.toString().contains("RollupDimensions")); + } - Util.registerMultiLineQuery(pigServer, query); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PrintStream ps = new PrintStream(baos); - pigServer.explain("b", ps); - assertTrue(baos.toString().contains("RollupDimensions")); + @Test + public void testExplainRollupHII() throws IOException { + // test for explain + String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long); " + + "b = cube a by rollup(a1,b1) pivot 1;"; + + Util.registerMultiLineQuery(pigServer, query); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + pigServer.explain("b", ps); + assertTrue(baos.toString().contains("RollupDimensions")); } @Test public void testDescribe() throws IOException { - // test for describe - String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long); " - + "b = cube a by cube(a1,b1);"; - - Util.registerMultiLineQuery(pigServer, query); - Schema sch = pigServer.dumpSchema("b"); - for (String alias : sch.getAliases()) { - if (alias.compareTo("cube") == 0) { - assertTrue(alias.contains("cube")); - } - } + // test for describe + String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long); " + + "b = cube a by cube(a1,b1);"; + + Util.registerMultiLineQuery(pigServer, query); + Schema sch = pigServer.dumpSchema("b"); + for (String alias : sch.getAliases()) { + if (alias.compareTo("cube") == 0) { + assertTrue(alias.contains("cube")); + } + } } }
Modified: pig/branches/spark/test/org/apache/pig/test/TestDataBagAccess.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestDataBagAccess.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestDataBagAccess.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestDataBagAccess.java Tue Jan 27 02:27:45 2015 @@ -29,7 +29,6 @@ import java.util.Iterator; import java.util.List; import java.util.Properties; -import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.builtin.BinStorage; @@ -52,7 +51,7 @@ public class TestDataBagAccess { @Before public void setUp() throws Exception{ - pigServer = new PigServer(ExecType.LOCAL, new Properties()); + pigServer = new PigServer(Util.getLocalTestMode(), new Properties()); } @Test Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java Tue Jan 27 02:27:45 2015 @@ -39,7 +39,6 @@ import junit.framework.Assert; import org.apache.hadoop.mapreduce.Job; import org.apache.pig.ComparisonFunc; import org.apache.pig.EvalFunc; -import org.apache.pig.ExecType; import org.apache.pig.FuncSpec; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; @@ -60,6 +59,7 @@ import org.apache.pig.impl.util.Pair; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.Utils; import org.apache.pig.test.utils.Identity; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; @@ -73,7 +73,7 @@ public class TestEvalPipelineLocal { @Before public void setUp() throws Exception{ - pigServer = new PigServer(ExecType.LOCAL); + pigServer = new PigServer(Util.getLocalTestMode()); } static public class MyBagFunction extends EvalFunc<DataBag>{ @@ -1030,6 +1030,8 @@ public class TestEvalPipelineLocal { @Test public void testExplainInDotGraph() throws Exception{ + Assume.assumeTrue("Skip this test for TEZ since TEZ does not support explain in dot format", + !Util.getLocalTestMode().toString().startsWith("TEZ")); pigServer.registerQuery("a = load 'voter' using " + PigStorage.class.getName() + "(',') as (name, age, registration, contributions);"); pigServer.registerQuery("b = filter a by age < 50;"); pigServer.registerQuery("c = group b by registration;"); @@ -1139,10 +1141,16 @@ public class TestEvalPipelineLocal { Schema expectedSchema = Utils.getSchemaFromString( "group: bytearray"); Assert.assertEquals(expectedSchema, dumpedSchema); + TupleFactory tf = TupleFactory.getInstance(); + List<Tuple> expected = new ArrayList<Tuple>(); + Tuple t = tf.newTuple(1); + t.set(0, new DataByteArray("NYSE".getBytes())); + expected.add(t); + t = tf.newTuple(1); + t.set(0, new DataByteArray("NASDAQ".getBytes())); + expected.add(t); Iterator<Tuple> iter = pigServer.openIterator("zzz"); - Assert.assertTrue(iter.next().toString().equals("(NYSE)")); - Assert.assertTrue(iter.next().toString().equals("(NASDAQ)")); - Assert.assertFalse(iter.hasNext()); + Util.checkQueryOutputsAfterSort(iter, expected); } // Self cross, see PIG-3292 Modified: pig/branches/spark/test/org/apache/pig/test/TestFetch.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFetch.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestFetch.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestFetch.java Tue Jan 27 02:27:45 2015 @@ -33,12 +33,11 @@ import java.util.List; import java.util.Properties; import java.util.Random; -import org.apache.pig.ExecType; import org.apache.pig.PigConfiguration; import org.apache.pig.PigServer; +import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine; import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher; import org.apache.pig.backend.hadoop.executionengine.fetch.FetchOptimizer; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecutionEngine; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; @@ -94,7 +93,7 @@ public class TestFetch { @Before public void setUp() throws Exception{ - pigServer = new PigServer(ExecType.LOCAL, new Properties()); + pigServer = new PigServer(Util.getLocalTestMode(), new Properties()); // force direct fetch mode pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_OPT_FETCH, "true"); } @@ -123,7 +122,7 @@ public class TestFetch { LogicalPlan lp = ParserTestingUtils.generateLogicalPlan(query); - PhysicalPlan pp = ((MRExecutionEngine) pigServer.getPigContext().getExecutionEngine()) + PhysicalPlan pp = ((HExecutionEngine) pigServer.getPigContext().getExecutionEngine()) .compile(lp, null); boolean planFetchable = FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp); @@ -162,7 +161,7 @@ public class TestFetch { LogicalPlan lp = ParserTestingUtils.generateLogicalPlan(query); - PhysicalPlan pp = ((MRExecutionEngine) pigServer.getPigContext().getExecutionEngine()) + PhysicalPlan pp = ((HExecutionEngine) pigServer.getPigContext().getExecutionEngine()) .compile(lp, null); boolean planFetchable = FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp); @@ -228,7 +227,7 @@ public class TestFetch { pigServer.setBatchOn(); LogicalPlan lp = TestPigStats.getLogicalPlan(pigServer); - PhysicalPlan pp = ((MRExecutionEngine) + PhysicalPlan pp = ((HExecutionEngine) pigServer.getPigContext().getExecutionEngine()).compile(lp, null); boolean planFetchable = FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp); assertFalse(planFetchable); @@ -257,7 +256,7 @@ public class TestFetch { pigServer.setBatchOn(); LogicalPlan lp = TestPigStats.getLogicalPlan(pigServer); - PhysicalPlan pp = ((MRExecutionEngine) + PhysicalPlan pp = ((HExecutionEngine) pigServer.getPigContext().getExecutionEngine()).compile(lp, null); boolean planFetchable = FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp); assertFalse(planFetchable); Modified: pig/branches/spark/test/org/apache/pig/test/TestFilterOpNumeric.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFilterOpNumeric.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestFilterOpNumeric.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestFilterOpNumeric.java Tue Jan 27 02:27:45 2015 @@ -28,7 +28,6 @@ import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.builtin.PigStorage; import org.apache.pig.data.Tuple; @@ -44,7 +43,7 @@ public class TestFilterOpNumeric { @Before public void setUp() throws Exception { - pig = new PigServer(ExecType.LOCAL); + pig = new PigServer(Util.getLocalTestMode()); } @Test Modified: pig/branches/spark/test/org/apache/pig/test/TestFilterOpString.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFilterOpString.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestFilterOpString.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestFilterOpString.java Tue Jan 27 02:27:45 2015 @@ -28,7 +28,6 @@ import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.builtin.PigStorage; import org.apache.pig.data.Tuple; @@ -46,7 +45,7 @@ public class TestFilterOpString { @Before public void setUp() throws Exception { FileLocalizer.deleteTempFiles(); - pig = new PigServer(ExecType.LOCAL); + pig = new PigServer(Util.getLocalTestMode()); } @Test Modified: pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlan.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlan.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlan.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlan.java Tue Jan 27 02:27:45 2015 @@ -214,18 +214,10 @@ public class TestForEachNestedPlan { "generate group, MIN(E); };"); Iterator<Tuple> iter = pig.openIterator("C"); - List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( new String[] {"(10,68)", "(20,78)"}); - - int counter = 0; - while (iter.hasNext()) { - assertEquals(expectedResults.get(counter++).toString(), - iter.next().toString()); - } - - assertEquals(expectedResults.size(), counter); + Util.checkQueryOutputsAfterSort(iter, expectedResults); } finally{ new File(INPUT_FILE).delete(); try { @@ -263,14 +255,7 @@ public class TestForEachNestedPlan { List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( new String[] {"(1,3)", "(1,4)", "(2,3)", "(2,4)"}); - - int counter = 0; - while (iter.hasNext()) { - assertEquals(expectedResults.get(counter++).toString(), - iter.next().toString()); - } - - assertEquals(expectedResults.size(), counter); + Util.checkQueryOutputsAfterSort(iter, expectedResults); } finally{ new File(INPUT_FILE).delete(); try { Modified: pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Tue Jan 27 02:27:45 2015 @@ -19,7 +19,6 @@ package org.apache.pig.test; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import java.io.File; import java.io.FileOutputStream; @@ -30,7 +29,6 @@ import java.util.Iterator; import java.util.List; import java.util.Random; -import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.data.Tuple; import org.apache.pig.test.utils.TestHelper; @@ -41,7 +39,7 @@ public class TestForEachNestedPlanLocal private PigServer pig ; public TestForEachNestedPlanLocal() throws Throwable { - pig = new PigServer(ExecType.LOCAL) ; + pig = new PigServer(Util.getLocalTestMode()) ; } Boolean[] nullFlags = new Boolean[]{ false, true }; @@ -79,19 +77,9 @@ public class TestForEachNestedPlanLocal pig.registerQuery("c = foreach b { " + " c1 = limit $1 5; " + " generate COUNT(c1); " + "};"); Iterator<Tuple> it = pig.openIterator("c"); - Tuple t = null; - long count[] = new long[3]; - for (int i = 0; i < 3 && it.hasNext(); i++) { - t = it.next(); - count[i] = (Long)t.get(0); - } - - assertFalse(it.hasNext()); + List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[] {"(5L)", "(5L)", "(3L)" }); - // Pig's previous local mode was screwed up correcting that - assertEquals(5L, count[0]); - assertEquals(5L, count[1]); - assertEquals(3L, count[2]); + Util.checkQueryOutputsAfterSort(it, expected); } @Test Modified: pig/branches/spark/test/org/apache/pig/test/TestForEachStar.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachStar.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestForEachStar.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestForEachStar.java Tue Jan 27 02:27:45 2015 @@ -20,17 +20,12 @@ package org.apache.pig.test; import static org.junit.Assert.*; import java.io.File; -import java.io.IOException; import java.util.Iterator; -import java.util.List; -import org.apache.pig.ExecType; import org.apache.pig.PigServer; -import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; -import org.apache.pig.parser.ParserException; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -65,8 +60,8 @@ public class TestForEachStar { } @Test - public void testForeachStarSchemaUnkown() throws IOException, ParserException{ - PigServer pig = new PigServer(ExecType.LOCAL); + public void testForeachStarSchemaUnkown() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); String query = " l1 = load '" + INPUT_FILE + "' ;" + "f1 = foreach l1 generate * ;" Modified: pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java Tue Jan 27 02:27:45 2015 @@ -1047,6 +1047,190 @@ public class TestHBaseStorage { /** * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into + * 'TESTTABLE_2' using HBaseBinaryFormat setting the timestamp + * + * @throws IOException + */ + @Test + public void testStoreToHBase_1_with_timestamp() throws IOException { + prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary); + prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary); + scanTable1(pig, DataFormat.HBaseBinary); + long timestamp = System.currentTimeMillis(); + pig.registerQuery("b = FOREACH a GENERATE rowKey, " + timestamp + "l, col_a, col_b, col_c;"); + pig.store("b", "hbase://" + TESTTABLE_2, + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + + TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTimestamp true')"); + + HTable table = new HTable(conf, TESTTABLE_2); + ResultScanner scanner = table.getScanner(new Scan()); + Iterator<Result> iter = scanner.iterator(); + int i = 0; + for (i = 0; iter.hasNext(); ++i) { + Result result = iter.next(); + String v = String.valueOf(i); + String rowKey = Bytes.toString(result.getRow()); + int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A)); + double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B)); + String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C)); + + long col_a_ts = getColTimestamp(result, TESTCOLUMN_A); + long col_b_ts = getColTimestamp(result, TESTCOLUMN_B); + long col_c_ts = getColTimestamp(result, TESTCOLUMN_C); + + Assert.assertEquals(timestamp, col_a_ts); + Assert.assertEquals(timestamp, col_b_ts); + Assert.assertEquals(timestamp, col_c_ts); + + Assert.assertEquals("00".substring(v.length()) + v, rowKey); + Assert.assertEquals(i, col_a); + Assert.assertEquals(i + 0.0, col_b, 1e-6); + Assert.assertEquals("Text_" + i, col_c); + } + Assert.assertEquals(100, i); + table.close(); + } + + /** + * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into + * 'TESTTABLE_2' using HBaseBinaryFormat setting the timestamp + * + * @throws IOException + */ + @Test + public void testStoreToHBase_1_with_datetime_timestamp() throws IOException { + prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary); + prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary); + scanTable1(pig, DataFormat.HBaseBinary); + long timestamp = System.currentTimeMillis(); + pig.registerQuery("b = FOREACH a GENERATE rowKey, ToDate(" + timestamp + "l), col_a, col_b, col_c;"); + pig.store("b", "hbase://" + TESTTABLE_2, + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + + TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTimestamp true')"); + + HTable table = new HTable(conf, TESTTABLE_2); + ResultScanner scanner = table.getScanner(new Scan()); + Iterator<Result> iter = scanner.iterator(); + int i = 0; + for (i = 0; iter.hasNext(); ++i) { + Result result = iter.next(); + String v = String.valueOf(i); + String rowKey = Bytes.toString(result.getRow()); + int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A)); + double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B)); + String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C)); + + long col_a_ts = getColTimestamp(result, TESTCOLUMN_A); + long col_b_ts = getColTimestamp(result, TESTCOLUMN_B); + long col_c_ts = getColTimestamp(result, TESTCOLUMN_C); + + Assert.assertEquals(timestamp, col_a_ts); + Assert.assertEquals(timestamp, col_b_ts); + Assert.assertEquals(timestamp, col_c_ts); + + Assert.assertEquals("00".substring(v.length()) + v, rowKey); + Assert.assertEquals(i, col_a); + Assert.assertEquals(i + 0.0, col_b, 1e-6); + Assert.assertEquals("Text_" + i, col_c); + } + Assert.assertEquals(100, i); + table.close(); + } + + /** + * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into + * 'TESTTABLE_2' using HBaseBinaryFormat setting the timestamp + * + * @throws IOException + */ + @Test + public void testStoreToHBase_1_with_bytearray_timestamp() throws IOException { + prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary); + prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary); + scanTable1(pig, DataFormat.HBaseBinary); + long timestamp = System.currentTimeMillis(); + pig.registerQuery("b = FOREACH a GENERATE rowKey, " + timestamp + "l as timestamp:bytearray, col_a, col_b, col_c;"); + pig.store("b", "hbase://" + TESTTABLE_2, + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + + TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTimestamp true')"); + + HTable table = new HTable(conf, TESTTABLE_2); + ResultScanner scanner = table.getScanner(new Scan()); + Iterator<Result> iter = scanner.iterator(); + int i = 0; + for (i = 0; iter.hasNext(); ++i) { + Result result = iter.next(); + String v = String.valueOf(i); + String rowKey = Bytes.toString(result.getRow()); + int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A)); + double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B)); + String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C)); + + long col_a_ts = getColTimestamp(result, TESTCOLUMN_A); + long col_b_ts = getColTimestamp(result, TESTCOLUMN_B); + long col_c_ts = getColTimestamp(result, TESTCOLUMN_C); + + Assert.assertEquals(timestamp, col_a_ts); + Assert.assertEquals(timestamp, col_b_ts); + Assert.assertEquals(timestamp, col_c_ts); + + Assert.assertEquals("00".substring(v.length()) + v, rowKey); + Assert.assertEquals(i, col_a); + Assert.assertEquals(i + 0.0, col_b, 1e-6); + Assert.assertEquals("Text_" + i, col_c); + } + Assert.assertEquals(100, i); + table.close(); + } + + /** + * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into + * 'TESTTABLE_1' deleting odd row keys + * + * @throws IOException + */ + @Test + public void testStoreToHBase_1_with_delete() throws IOException { + prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary); + scanTable1(pig, DataFormat.HBaseBinary); + pig.registerQuery("b = FOREACH a GENERATE rowKey, (boolean)(((int)rowKey) % 2), col_a, col_b, col_c;"); + pig.store("b", "hbase://" + TESTTABLE_1, + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + + TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTombstone true')"); + + HTable table = new HTable(conf, TESTTABLE_1); + ResultScanner scanner = table.getScanner(new Scan()); + Iterator<Result> iter = scanner.iterator(); + int count = 0; + for (int i = 0; iter.hasNext(); i = i + 2) { + Result result = iter.next(); + String v = String.valueOf(i); + String rowKey = Bytes.toString(result.getRow()); + int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A)); + double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B)); + String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C)); + + long col_a_ts = getColTimestamp(result, TESTCOLUMN_A); + long col_b_ts = getColTimestamp(result, TESTCOLUMN_B); + long col_c_ts = getColTimestamp(result, TESTCOLUMN_C); + + Assert.assertEquals("00".substring(v.length()) + v, rowKey); + Assert.assertEquals(i, col_a); + Assert.assertEquals(i + 0.0, col_b, 1e-6); + Assert.assertEquals("Text_" + i, col_c); + + count++; + } + Assert.assertEquals(50, count); + table.close(); + } + + /** + * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into * 'TESTTABLE_2' using UTF-8 Plain Text format * * @throws IOException @@ -1094,6 +1278,7 @@ public class TestHBaseStorage { Object key = "somekey"; byte type = DataType.CHARARRAY; Assert.assertFalse(hbaseStorage.createPut(key, type).getWriteToWAL()); + Assert.assertFalse(hbaseStorage.createDelete(key, type, System.currentTimeMillis()).getWriteToWAL()); } /** @@ -1108,6 +1293,7 @@ public class TestHBaseStorage { Object key = "somekey"; byte type = DataType.CHARARRAY; Assert.assertTrue(hbaseStorage.createPut(key, type).getWriteToWAL()); + Assert.assertTrue(hbaseStorage.createDelete(key, type, System.currentTimeMillis()).getWriteToWAL()); } /** @@ -1357,4 +1543,16 @@ public class TestHBaseStorage { return result.getValue(colArray[0], colArray[1]); } + + /** + * Helper to deal with fetching a timestamp based on a cf:colname string spec + * @param result + * @param colName + * @return + */ + private static long getColTimestamp(Result result, String colName) { + byte[][] colArray = Bytes.toByteArrays(colName.split(":")); + return result.getColumnLatestCell(colArray[0], colArray[1]).getTimestamp(); + } + }
