Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java Thu Nov 27 12:49:54 2014 @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -31,10 +32,15 @@ import java.util.Map; import java.util.Properties; import java.util.Random; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.pig.EvalFunc; import org.apache.pig.PigConfiguration; import org.apache.pig.PigException; import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.builtin.BinStorage; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; @@ -44,6 +50,7 @@ import org.apache.pig.data.DefaultBagFac import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigImplConstants; +import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.util.LogUtils; @@ -347,7 +354,7 @@ public class TestEvalPipeline2 { ps.close(); pigServer.registerQuery("A = LOAD '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pigServer + + Util.generateURI(tmpFile.toString(), pigServer .getPigContext()) + "' AS (num:int);"); pigServer.registerQuery("B = order A by num parallel 2;"); pigServer.registerQuery("C = limit B 10;"); @@ -376,7 +383,7 @@ public class TestEvalPipeline2 { ps.close(); pigServer.registerQuery("A = LOAD '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pigServer + + Util.generateURI(tmpFile.toString(), pigServer .getPigContext()) + "' AS (num:int);"); pigServer.registerQuery("B = order A by num parallel 2;"); pigServer.registerQuery("C = limit B 10;"); @@ -409,7 +416,7 @@ public class TestEvalPipeline2 { ps.close(); pigServer.registerQuery("A = LOAD '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pigServer + + Util.generateURI(tmpFile.toString(), pigServer .getPigContext()) + "' AS (num:int);"); pigServer.registerQuery("B = order A by num desc parallel 2;"); pigServer.registerQuery("C = limit B 10;"); @@ -456,8 +463,8 @@ public class TestEvalPipeline2 { ps2.println("2\t2"); ps2.close(); - pigServer.registerQuery("A = LOAD '" + Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); - pigServer.registerQuery("B = LOAD '" + Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);"); + pigServer.registerQuery("A = LOAD '" + Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("B = LOAD '" + Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);"); pigServer.registerQuery("C = LIMIT B 100;"); pigServer.registerQuery("D = COGROUP C BY b0, A BY a0 PARALLEL 2;"); Iterator<Tuple> iter = pigServer.openIterator("D"); @@ -1424,7 +1431,7 @@ public class TestEvalPipeline2 { public void testNonStandardDataWithoutFetch() throws Exception{ Assume.assumeTrue("Skip this test for TEZ. See PIG-3994", Util.isMapredExecType(cluster.getExecType())); Properties props = pigServer.getPigContext().getProperties(); - props.setProperty(PigConfiguration.OPT_FETCH, "false"); + props.setProperty(PigConfiguration.PIG_OPT_FETCH, "false"); String[] input1 = { "0", }; @@ -1441,7 +1448,7 @@ public class TestEvalPipeline2 { } } finally { - props.setProperty(PigConfiguration.OPT_FETCH, "true"); + props.setProperty(PigConfiguration.PIG_OPT_FETCH, "true"); } } @@ -1604,4 +1611,53 @@ public class TestEvalPipeline2 { Assert.assertFalse(iter.hasNext()); } + + @SuppressWarnings("unchecked") + @Test + public void testCrossAfterGroupAll() throws Exception{ + String[] input = { + "1\tA", + "2\tB", + "3\tC", + "4\tD", + }; + + Util.createInputFile(cluster, "table_testCrossAfterGroupAll", input); + + try { + pigServer.getPigContext().getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "40"); + pigServer.registerQuery("A = load 'table_testCrossAfterGroupAll' as (a0:int, a1:chararray);"); + pigServer.registerQuery("B = group A all;"); + pigServer.registerQuery("C = foreach B generate COUNT(A);"); + pigServer.registerQuery("D = cross A, C;"); + Path output = FileLocalizer.getTemporaryPath(pigServer.getPigContext()); + ExecJob job = pigServer.store("D", output.toString()); + FileSystem fs = output.getFileSystem(cluster.getConfiguration()); + FileStatus[] partFiles = fs.listStatus(output, new PathFilter() { + @Override + public boolean accept(Path path) { + if (path.getName().startsWith("part")) { + return true; + } + return false; + } + }); + // auto-parallelism is 2 in MR, 20 in Tez, so check >=2 + Assert.assertTrue(partFiles.length >= 2); + // Check the output + Iterator<Tuple> iter = job.getResults(); + List<Tuple> results = new ArrayList<Tuple>(); + while (iter.hasNext()) { + results.add(iter.next()); + } + Collections.sort(results); + Assert.assertEquals(4, results.size()); + Assert.assertEquals("(1,A,4)", results.get(0).toString()); + Assert.assertEquals("(2,B,4)", results.get(1).toString()); + Assert.assertEquals("(3,C,4)", results.get(2).toString()); + Assert.assertEquals("(4,D,4)", results.get(3).toString()); + } finally { + pigServer.getPigContext().getProperties().remove("pig.exec.reducers.bytes.per.reducer"); + } + } }
Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java Thu Nov 27 12:49:54 2014 @@ -107,7 +107,7 @@ public class TestEvalPipelineLocal { File f1 = createFile(new String[]{"a:1","b:1","a:1"}); pigServer.registerQuery("a = load '" - + Util.generateURI(Util.encodeEscape(f1.toString()), pigServer.getPigContext()) + + Util.generateURI(f1.toString(), pigServer.getPigContext()) + "' using " + PigStorage.class.getName() + "(':');"); pigServer.registerQuery("b = foreach a generate 1-1/1;"); Iterator<Tuple> iter = pigServer.openIterator("b"); @@ -125,10 +125,10 @@ public class TestEvalPipelineLocal { File f2 = createFile(new String[]{"b","b","a"}); pigServer.registerQuery("a = load '" - + Util.generateURI(Util.encodeEscape(f1.toString()), pigServer.getPigContext()) + + Util.generateURI(f1.toString(), pigServer.getPigContext()) + "' using " + PigStorage.class.getName() + "(':');"); pigServer.registerQuery("b = load '" - + Util.generateURI(Util.encodeEscape(f2.toString()), pigServer.getPigContext()) + + Util.generateURI(f2.toString(), pigServer.getPigContext()) + "';"); pigServer.registerQuery("c = cogroup a by $0, b by $0;"); pigServer.registerQuery("d = foreach c generate flatten($1),flatten($2);"); @@ -151,7 +151,7 @@ public class TestEvalPipelineLocal { pw.println("a"); pw.close(); pigServer.registerQuery("a = foreach (load '" - + Util.generateURI(Util.encodeEscape(f.toString()), pigServer.getPigContext()) + + Util.generateURI(f.toString(), pigServer.getPigContext()) + "') generate 1, flatten(" + MyBagFunction.class.getName() + "(*));"); // pigServer.registerQuery("b = foreach a generate $0, flatten($1);"); @@ -340,11 +340,11 @@ public class TestEvalPipelineLocal { expectedResults.put("conference", 1); pigServer.registerQuery("newsArticles = LOAD '" - + Util.generateURI(Util.encodeEscape(newsFile.toString()), pigServer + + Util.generateURI(newsFile.toString(), pigServer .getPigContext()) + "' USING " + TextLoader.class.getName() + "();"); pigServer.registerQuery("queryLog = LOAD '" - + Util.generateURI(Util.encodeEscape(queryLogFile.toString()), pigServer + + Util.generateURI(queryLogFile.toString(), pigServer .getPigContext()) + "';"); pigServer.registerQuery("titleNGrams = FOREACH newsArticles GENERATE flatten(" + TitleNGrams.class.getName() + "(*));"); @@ -401,7 +401,7 @@ public class TestEvalPipelineLocal { String tmpOutputFile = FileLocalizer.getTemporaryPath(pigServer.getPigContext()).toString(); pigServer.registerQuery("A = LOAD '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pigServer + + Util.generateURI(tmpFile.toString(), pigServer .getPigContext()) + "';"); if (eliminateDuplicates){ pigServer.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;"); @@ -448,7 +448,7 @@ public class TestEvalPipelineLocal { ps.close(); pigServer.registerQuery("A = LOAD '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pigServer + + Util.generateURI(tmpFile.toString(), pigServer .getPigContext()) + "';"); pigServer.registerQuery("B = group A by $0;"); String query = "C = foreach B {" @@ -488,7 +488,7 @@ public class TestEvalPipelineLocal { ps.close(); pigServer.registerQuery("A = LOAD '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pigServer + + Util.generateURI(tmpFile.toString(), pigServer .getPigContext()) + "';"); pigServer.registerQuery("B = group A by $0;"); String query = "C = foreach B {" @@ -530,7 +530,7 @@ public class TestEvalPipelineLocal { ps.close(); pigServer.registerQuery("A = LOAD '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pigServer + + Util.generateURI(tmpFile.toString(), pigServer .getPigContext()) + "';"); pigServer.registerQuery("B = limit A 5;"); Iterator<Tuple> iter = pigServer.openIterator("B"); @@ -550,7 +550,7 @@ public class TestEvalPipelineLocal { new String[] {"{(f1, f2),(f3, f4)}\t(1,2)\t[key1#value1,key2#value2]"}); pigServer.registerQuery("a = load '" - + Util.generateURI(Util.encodeEscape(input.toString()), pigServer.getPigContext()) + + Util.generateURI(input.toString(), pigServer.getPigContext()) + "' using PigStorage() " + "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);"); pigServer.registerQuery("b = foreach a generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';"); @@ -564,7 +564,7 @@ public class TestEvalPipelineLocal { //test with BinStorage pigServer.registerQuery("a = load '" - + Util.generateURI(Util.encodeEscape(input.toString()), pigServer.getPigContext()) + + Util.generateURI(input.toString(), pigServer.getPigContext()) + "' using PigStorage() " + "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);"); String output = "./TestEvalPipeline-testComplexData"; @@ -781,7 +781,7 @@ public class TestEvalPipelineLocal { ps.close(); pigServer.registerQuery("A = LOAD '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pigServer + + Util.generateURI(tmpFile.toString(), pigServer .getPigContext()) + "';"); pigServer.registerQuery("B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;"); // the argument @@ -825,7 +825,7 @@ public class TestEvalPipelineLocal { ps.close(); pigServer.registerQuery("A = LOAD '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pigServer + + Util.generateURI(tmpFile.toString(), pigServer .getPigContext()) + "';"); pigServer.registerQuery("B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;"); // the argument @@ -873,7 +873,7 @@ public class TestEvalPipelineLocal { ps.close(); pigServer.registerQuery("A = LOAD '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pigServer + + Util.generateURI(tmpFile.toString(), pigServer .getPigContext()) + "';"); pigServer.registerQuery("B = group A by $0;"); String query = "C = foreach B {" @@ -916,7 +916,7 @@ public class TestEvalPipelineLocal { ps.close(); pigServer.registerQuery("A = LOAD '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pigServer + + Util.generateURI(tmpFile.toString(), pigServer .getPigContext()) + "';"); pigServer.registerQuery("B = distinct A;"); String query = "C = foreach B {" @@ -964,7 +964,7 @@ public class TestEvalPipelineLocal { ps.close(); pigServer.registerQuery("A = LOAD '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pigServer + + Util.generateURI(tmpFile.toString(), pigServer .getPigContext()) + "';"); pigServer.registerQuery("B = distinct A;"); String query = "C = foreach B {" @@ -1006,7 +1006,7 @@ public class TestEvalPipelineLocal { ps.close(); pigServer.registerQuery("A = LOAD '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pigServer + + Util.generateURI(tmpFile.toString(), pigServer .getPigContext()) + "';"); pigServer.registerQuery("B = distinct A ;"); //the argument does not matter pigServer.registerQuery("C = foreach B generate FLATTEN(" + Identity.class.getName() + "($0, $1));"); //the argument does not matter @@ -1088,7 +1088,7 @@ public class TestEvalPipelineLocal { @Test public void testSetLocationCalledInFE() throws Exception { File f1 = createFile(new String[]{"a","b"}); - pigServer.registerQuery("a = load '" + Util.generateURI(Util.encodeEscape(f1.toString()), pigServer.getPigContext()) + pigServer.registerQuery("a = load '" + Util.generateURI(f1.toString(), pigServer.getPigContext()) + "' using " + SetLocationTestLoadFunc.class.getName() + "();"); pigServer.registerQuery("b = order a by $0;"); @@ -1101,7 +1101,7 @@ public class TestEvalPipelineLocal { @Test public void testGroupByTuple() throws Exception { File f1 = createFile(new String[]{"1\t2\t3","4\t5\t6"}); - pigServer.registerQuery("a = load '" + Util.generateURI(Util.encodeEscape(f1.toString()), pigServer.getPigContext()) + pigServer.registerQuery("a = load '" + Util.generateURI(f1.toString(), pigServer.getPigContext()) + "' as (x:int, y:int, z:int);"); pigServer.registerQuery("b = foreach a generate TOTUPLE(x, y) as t, z;"); pigServer.registerQuery("c = group b by t;"); @@ -1115,7 +1115,7 @@ public class TestEvalPipelineLocal { // See PIG-3060 public void testFlattenEmptyBag() throws Exception { File f1 = createFile(new String[]{"2\t{}","3\t{(1),(2)}", "4\t{}"}); - pigServer.registerQuery("A = load '" + Util.generateURI(Util.encodeEscape(f1.toString()), pigServer.getPigContext()) + pigServer.registerQuery("A = load '" + Util.generateURI(f1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:bag{(t:chararray)});"); pigServer.registerQuery("B = group A by a0;"); pigServer.registerQuery("C = foreach B { c1 = foreach A generate FLATTEN(a1); generate COUNT(c1);};"); @@ -1231,4 +1231,30 @@ public class TestEvalPipelineLocal { Iterator<Tuple> iter = pigServer.openIterator("D"); Assert.assertEquals(iter.next().toString(), "(lily)"); } + + public static class TOTUPLENOINNERSCHEMA extends EvalFunc<Tuple> { + @Override + public Tuple exec(Tuple input) throws IOException { + return input; + } + } + + // see PIG-4298 + @Test + public void testBytesRawComparatorDesc() throws Exception{ + File f1 = createFile(new String[]{"2", "1", "4", "3"}); + + pigServer.registerQuery("a = load '" + Util.generateURI(f1.toString(), pigServer.getPigContext()) + + "' as (value:long);"); + pigServer.registerQuery("b = foreach a generate " + TOTUPLENOINNERSCHEMA.class.getName() + "(value);"); + pigServer.registerQuery("c = foreach b generate flatten($0);"); + pigServer.registerQuery("d = order c by $0 desc;"); + + Iterator<Tuple> iter = pigServer.openIterator("d"); + Assert.assertEquals(iter.next().toString(), "(4)"); + Assert.assertEquals(iter.next().toString(), "(3)"); + Assert.assertEquals(iter.next().toString(), "(2)"); + Assert.assertEquals(iter.next().toString(), "(1)"); + Assert.assertFalse(iter.hasNext()); + } } Modified: pig/branches/spark/test/org/apache/pig/test/TestFetch.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFetch.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestFetch.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestFetch.java Thu Nov 27 12:49:54 2014 @@ -96,7 +96,7 @@ public class TestFetch { public void setUp() throws Exception{ pigServer = new PigServer(ExecType.LOCAL, new Properties()); // force direct fetch mode - pigServer.getPigContext().getProperties().setProperty(PigConfiguration.OPT_FETCH, "true"); + pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_OPT_FETCH, "true"); } @Test @@ -127,7 +127,8 @@ public class TestFetch { .compile(lp, null); boolean planFetchable = FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp); - assertTrue(planFetchable); + //plan is not fetchable since limit is not pushed up to the loader + assertFalse(planFetchable); } Modified: pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlan.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlan.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlan.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlan.java Thu Nov 27 12:49:54 2014 @@ -59,7 +59,7 @@ public class TestForEachNestedPlan { @Test public void testInnerOrderBy() throws Exception { for (int i = 0; i < nullFlags.length; i++) { - System.err.println("Running testInnerOrderBy with nullFlags set to :" + System.out.println("Running testInnerOrderBy with nullFlags set to :" + nullFlags[i]); File tmpFile = genDataSetFile1(nullFlags[i]); pig.registerQuery("a = load '" Modified: pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Thu Nov 27 12:49:54 2014 @@ -52,7 +52,7 @@ public class TestForEachNestedPlanLocal System.err.println("Running testInnerOrderBy with nullFlags set to :" + nullFlags[i]); File tmpFile = genDataSetFile1(nullFlags[i]); pig.registerQuery("a = load '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pig.getPigContext()) + + Util.generateURI(tmpFile.toString(), pig.getPigContext()) + "'; "); pig.registerQuery("b = group a by $0; "); pig.registerQuery("c = foreach b { " + " c1 = order $1 by *; " @@ -73,7 +73,7 @@ public class TestForEachNestedPlanLocal public void testInnerLimit() throws Exception { File tmpFile = genDataSetFileOneGroup(); pig.registerQuery("a = load '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pig.getPigContext()) + + Util.generateURI(tmpFile.toString(), pig.getPigContext()) + "'; "); pig.registerQuery("b = group a by $0; "); pig.registerQuery("c = foreach b { " + " c1 = limit $1 5; " @@ -289,4 +289,4 @@ public class TestForEachNestedPlanLocal profilePS.close(); return new File[] { userFile, sessionFile, profileFile }; } -} \ No newline at end of file +} Modified: pig/branches/spark/test/org/apache/pig/test/TestGFCross.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGFCross.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestGFCross.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestGFCross.java Thu Nov 27 12:49:54 2014 @@ -20,16 +20,16 @@ package org.apache.pig.test; import static org.junit.Assert.assertEquals; import org.apache.hadoop.conf.Configuration; -import org.apache.pig.PigConfiguration; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.builtin.GFCross; import org.apache.pig.impl.util.UDFContext; import org.junit.Test; public class TestGFCross { - + // Test GFCross returns the correct number of default // join groups @Test @@ -49,7 +49,7 @@ public class TestGFCross { @Test public void testSerial() throws Exception { Configuration cfg = new Configuration(); - cfg.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + ".1", "1"); + cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "1"); UDFContext.getUDFContext().addJobConf(cfg); Tuple t = TupleFactory.getInstance().newTuple(2); @@ -65,7 +65,7 @@ public class TestGFCross { @Test public void testParallelSet() throws Exception { Configuration cfg = new Configuration(); - cfg.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + ".1", "10"); + cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "10"); UDFContext.getUDFContext().addJobConf(cfg); Tuple t = TupleFactory.getInstance().newTuple(2); Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Thu Nov 27 12:49:54 2014 @@ -50,6 +50,7 @@ import org.apache.pig.backend.executione import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.util.JavaCompilerHelper; import org.apache.pig.test.Util.ProcessReturnInfo; import org.apache.pig.tools.grunt.Grunt; import org.apache.pig.tools.pigscript.parser.ParseException; @@ -66,7 +67,7 @@ public class TestGrunt { @BeforeClass public static void oneTimeSetup() throws Exception { - cluster.setProperty(PigConfiguration.OPT_MULTIQUERY,"true"); + cluster.setProperty(PigConfiguration.PIG_OPT_MULTIQUERY,"true"); } @AfterClass @@ -1014,7 +1015,8 @@ public class TestGrunt { +"store a into 'baz';" +"cd /;" +"fs -ls .;" - +"fs -rmr /fstmp/foo/baz;"; + +"fs -rmr /fstmp/foo/baz;" + +"cd"; ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); InputStreamReader reader = new InputStreamReader(cmd); @@ -1445,4 +1447,62 @@ public class TestGrunt { assertEquals(Level.INFO.toString(), pc.getLog4jProperties().getProperty("log4j.logger.org.apache.pig")); } + + @Test + public void testAutoShipUDFContainingJar() throws Throwable { + + String FILE_SEPARATOR = System.getProperty("file.separator"); + File tmpDir = File.createTempFile("test", ""); + tmpDir.delete(); + tmpDir.mkdir(); + + File udfDir = new File(tmpDir.getAbsolutePath() + FILE_SEPARATOR + "com" + FILE_SEPARATOR + + "xxx" + FILE_SEPARATOR + "udf"); + udfDir.mkdirs(); + + String udfSrc = new String("package com.xxx.udf;\n" + + "import java.io.IOException;\n" + + "import org.apache.pig.EvalFunc;\n" + + "import org.apache.pig.data.Tuple;\n" + + "public class TestUDF extends EvalFunc<Integer>{\n" + + "public Integer exec(Tuple input) throws IOException {\n" + + "return 1;}\n" + + "}"); + + // compile + JavaCompilerHelper javaCompilerHelper = new JavaCompilerHelper(); + javaCompilerHelper.compile(tmpDir.getAbsolutePath(), + new JavaCompilerHelper.JavaSourceFromString("com.xxx.udf.TestUDF", udfSrc)); + + String jarName = "TestUDFJar.jar"; + String jarFile = tmpDir.getAbsolutePath() + FILE_SEPARATOR + jarName; + int status = Util.executeJavaCommand("jar -cf " + jarFile + + " -C " + tmpDir.getAbsolutePath() + " " + "com"); + assertEquals(0, status); + + Util.createInputFile(cluster, "table_testAutoShipUDFContainingJar", new String[] { "1" }); + File scriptFile = Util.createFile(new String[] { + "a = load 'table_testAutoShipUDFContainingJar' as (a0:int);" + + "b = foreach a generate com.xxx.udf.TestUDF(a0);" + + "store b into 'output_testAutoShipUDFContainingJar';" + }); + String scriptFileName = scriptFile.getAbsolutePath(); + String execTypeOptions = "-x " + cluster.getExecType() + " "; + String cmd = "java -cp " + System.getProperty("java.class.path") + File.pathSeparator + jarFile + + " org.apache.pig.Main " + execTypeOptions + scriptFileName; + ProcessReturnInfo pri = Util.executeJavaCommandAndReturnInfo(cmd); + assertEquals(pri.exitCode, 0); + String[] lines = pri.stderrContents.split("\n"); + boolean found = false; + for (String line : lines) { + if (line.matches(".*Added jar .*" + jarName + ".*")) { + // MR mode + found = true; + } else if (line.matches(".*Local resource.*" + jarName + ".*")) { + // Tez mode + found = true; + } + } + assertTrue(found); + } } Modified: pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java Thu Nov 27 12:49:54 2014 @@ -16,6 +16,7 @@ */ package org.apache.pig.test; +import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -41,12 +42,14 @@ import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.hbase.HBaseStorage; +import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -99,7 +102,7 @@ public class TestHBaseStorage { @Before public void beforeTest() throws Exception { - pig = new PigServer(ExecType.LOCAL, conf); + pig = new PigServer(Util.getLocalTestMode(), conf); } @After @@ -123,6 +126,7 @@ public class TestHBaseStorage { deletes.add(new Delete(row.getRow())); } table.delete(deletes); + table.close(); } /** @@ -825,6 +829,115 @@ public class TestHBaseStorage { } /** + * Test merge inner join with two tables + * + * @throws IOException + */ + @Test + public void testMergeJoin() throws IOException { + Assume.assumeTrue("Skip this test for TEZ. See PIG-4315", pig.getPigContext().getExecType().equals(ExecType.LOCAL)); + prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary); + prepareTable(TESTTABLE_2, true, DataFormat.HBaseBinary); + pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using " + + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C + + "','-loadKey -caster HBaseBinaryConverter') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);"); + pig.registerQuery("b = load 'hbase://" + TESTTABLE_2 + "' using " + + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C + + "','-loadKey -caster HBaseBinaryConverter') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);"); + pig.registerQuery("c = join a by rowKey, b by rowKey USING 'merge';"); + pig.registerQuery("d = ORDER c BY a::rowKey;"); + + Iterator<Tuple> it = pig.openIterator("d"); + int count = 0; + LOG.info("MergeJoin Starting"); + while (it.hasNext()) { + Tuple t = it.next(); + // the columns for both relations should be merged into one tuple + // left side + String rowKey = (String) t.get(0); + int col_a = (Integer) t.get(1); + double col_b = (Double) t.get(2); + String col_c = (String) t.get(3); + + Assert.assertEquals("00".substring((count + "").length()) + count, + rowKey); + Assert.assertEquals(count, col_a); + Assert.assertEquals(count + 0.0, col_b, 1e-6); + Assert.assertEquals("Text_" + count, col_c); + + // right side + String rowKey2 = (String) t.get(4); + int col_a2 = (Integer) t.get(5); + double col_b2 = (Double) t.get(6); + String col_c2 = (String) t.get(7); + + Assert.assertEquals("00".substring((count + "").length()) + count, + rowKey2); + Assert.assertEquals(count, col_a2); + Assert.assertEquals(count + 0.0, col_b2, 1e-6); + Assert.assertEquals("Text_" + count, col_c2); + + count++; + } + Assert.assertEquals(count, TEST_ROW_COUNT); + LOG.info("MergeJoin done"); + } + + /** + * Test collected group + * not much to test here since keys are unique + * + * @throws IOException + */ + @Test + public void testCollectedGroup() throws IOException { + prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary); + prepareTable(TESTTABLE_2, true, DataFormat.HBaseBinary); + pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using " + + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C + + "','-loadKey -caster HBaseBinaryConverter') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);"); + pig.registerQuery("c = group a by rowKey USING 'collected';"); + pig.registerQuery("d = ORDER c BY group;"); + + // do a merge group + Iterator<Tuple> it = pig.openIterator("d"); + int count = 0; + LOG.info("CollectedGroup Starting"); + while (it.hasNext()) { + Tuple t = it.next(); + + String rowKey = (String)t.get(0); + + Assert.assertEquals("00".substring((count + "").length()) + count, + rowKey); + + int rowCount = 0; + DataBag rows = (DataBag)t.get(1); + for (Iterator<Tuple> iter = rows.iterator(); iter.hasNext();) { + Tuple row = iter.next(); + + // there should be two bags with all 3 columns + int col_a = (Integer) row.get(1); + double col_b = (Double) row.get(2); + String col_c = (String) row.get(3); + + Assert.assertEquals(count, col_a); + Assert.assertEquals(count + 0.0, col_b, 1e-6); + Assert.assertEquals("Text_" + count, col_c); + rowCount++; + } + Assert.assertEquals(1, rowCount); + + count++; + } + Assert.assertEquals(TEST_ROW_COUNT, count); + LOG.info("CollectedGroup done"); + } + + /** * Test Load from hbase using HBaseBinaryConverter */ @Test @@ -892,6 +1005,7 @@ public class TestHBaseStorage { pig.getPigContext().getProperties() .setProperty(MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS, "false"); + table.close(); } /** @@ -928,6 +1042,7 @@ public class TestHBaseStorage { Assert.assertEquals(i + 0.0, col_b, 1e-6); } Assert.assertEquals(100, i); + table.close(); } /** @@ -964,6 +1079,7 @@ public class TestHBaseStorage { Assert.assertEquals("Text_" + i, col_c); } Assert.assertEquals(100, i); + table.close(); } /** @@ -1026,6 +1142,7 @@ public class TestHBaseStorage { Assert.assertEquals(i + 0.0, col_b, 1e-6); } Assert.assertEquals(100, i); + table.close(); } /** @@ -1061,6 +1178,7 @@ public class TestHBaseStorage { Assert.assertEquals(i + 0.0 + "", col_b); } Assert.assertEquals(100, i); + table.close(); } /** @@ -1101,6 +1219,43 @@ public class TestHBaseStorage { Assert.assertEquals(index, TEST_ROW_COUNT); } + @Test + // See PIG-4151 + public void testStoreEmptyMap() throws IOException { + String tableName = "emptyMapTest"; + HTable table; + try { + deleteAllRows(tableName); + } catch (Exception e) { + // It's ok, table might not exist. + } + byte[][] cfs = new byte[2][]; + cfs[0] = Bytes.toBytes("info"); + cfs[1] = Bytes.toBytes("friends"); + try { + table = util.createTable(Bytes.toBytesBinary(tableName), + cfs); + } catch (Exception e) { + table = new HTable(conf, Bytes.toBytesBinary(tableName)); + } + + File inputFile = Util.createInputFile("test", "tmp", new String[] {"row1;Homer;Morrison;[1#Silvia,2#Stacy]", + "row2;Sheila;Fletcher;[1#Becky,2#Salvador,3#Lois]", + "row4;Andre;Morton;[1#Nancy]", + "row3;Sonja;Webb;[]" + }); + pig.registerQuery("source = LOAD '" + Util.generateURI(inputFile.toString(), pig.getPigContext()) + + "' USING PigStorage(';')" + + " AS (row:chararray, first_name:chararray, last_name:chararray, friends:map[]);"); + pig.registerQuery("STORE source INTO 'hbase://" + tableName + "' USING" + + " org.apache.pig.backend.hadoop.hbase.HBaseStorage('info:fname info:lname friends:*');"); + Get get = new Get(Bytes.toBytes("row3")); + Result r = table.get(get); + Assert.assertEquals(new String(r.getValue(cfs[0], Bytes.toBytes("fname"))), "Sonja"); + Assert.assertEquals(new String(r.getValue(cfs[0], Bytes.toBytes("lname"))), "Webb"); + Assert.assertTrue(r.getFamilyMap(cfs[1]).isEmpty()); + } + private void scanTable1(PigServer pig, DataFormat dataFormat) throws IOException { scanTable1(pig, dataFormat, ""); } Modified: pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java Thu Nov 27 12:49:54 2014 @@ -31,8 +31,10 @@ import java.net.URLClassLoader; import java.util.ArrayList; import java.util.Arrays; import java.util.Enumeration; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.jar.JarOutputStream; @@ -47,6 +49,7 @@ import javax.tools.ToolProvider; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.jobcontrol.JobControl; @@ -55,11 +58,12 @@ import org.apache.pig.ExecType; import org.apache.pig.FuncSpec; import org.apache.pig.LoadFunc; import org.apache.pig.PigServer; +import org.apache.pig.backend.hadoop.executionengine.JobCreationException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.builtin.PigStorage; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileSpec; @@ -72,7 +76,7 @@ public class TestJobControlCompiler { private static final Configuration CONF = new Configuration(); - + @BeforeClass public static void setupClass() throws Exception { // creating a hadoop-site.xml and making it visible to Pig @@ -126,13 +130,14 @@ public class TestJobControlCompiler { // verifying the jar gets on distributed cache Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf); - Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 8, fileClassPaths.length); + // guava jar is not shipped with Hadoop 2.x + Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), HadoopShims.isHadoopYARN() ? 5 : 6, fileClassPaths.length); Path distributedCachePath = fileClassPaths[0]; Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName()); // hadoop bug requires path to not contain hdfs://hotname in front - Assert.assertTrue("starts with /: "+distributedCachePath, + Assert.assertTrue("starts with /: "+distributedCachePath, distributedCachePath.toString().startsWith("/")); - Assert.assertTrue("jar pushed to distributed cache should contain testUDF", + Assert.assertTrue("jar pushed to distributed cache should contain testUDF", jarContainsFileNamed(new File(fileClassPaths[0].toUri().getPath()), testUDFFileName)); } @@ -171,15 +176,9 @@ public class TestJobControlCompiler { StringUtils.join(zipArchives, ",")); pigContext.getProperties().put("pig.streaming.cache.files", StringUtils.join(tarArchives, ",")); - final JobControlCompiler jobControlCompiler = new JobControlCompiler( - pigContext, CONF); - final MROperPlan plan = new MROperPlan(); - plan.add(new MapReduceOper(new OperatorKey())); + final JobConf jobConf = compileTestJob(pigContext, CONF); - final JobControl jobControl = jobControlCompiler.compile(plan, "test"); - final JobConf jobConf = jobControl.getWaitingJobs().get(0).getJobConf(); - URI[] uris = DistributedCache.getCacheFiles(jobConf); int sizeTxt = 0; for (int i = 0; i < uris.length; i++) { @@ -193,6 +192,100 @@ public class TestJobControlCompiler { ".tar.gz", ".tar"); } + private JobConf compileTestJob(final PigContext pigContext, Configuration conf) + throws JobCreationException { + final JobControlCompiler jobControlCompiler = new JobControlCompiler( + pigContext, conf); + + final MROperPlan plan = new MROperPlan(); + plan.add(new MapReduceOper(new OperatorKey())); + + final JobControl jobControl = jobControlCompiler.compile(plan, "test"); + final JobConf jobConf = jobControl.getWaitingJobs().get(0).getJobConf(); + return jobConf; + } + + /** + * Tests that no duplicate jars are added to distributed cache, which might cause conflicts + * and tests with both symlinked and normal jar specification + */ + @Test + public void testNoDuplicateJarsInDistributedCache() throws Exception { + + // JobControlCompiler setup + final PigServer pigServer = new PigServer(ExecType.MAPREDUCE); + PigContext pigContext = pigServer.getPigContext(); + pigContext.connect(); + + Configuration conf = new Configuration(); + DistributedCache.addFileToClassPath(new Path(new URI("/lib/udf-0.jar#udf.jar")), conf, FileSystem.get(conf)); + DistributedCache.addFileToClassPath(new Path(new URI("/lib/udf1.jar#diffname.jar")), conf, FileSystem.get(conf)); + DistributedCache.addFileToClassPath(new Path(new URI("/lib/udf2.jar")), conf, FileSystem.get(conf)); + createAndAddResource("udf.jar", pigContext); + createAndAddResource("udf1.jar", pigContext); + createAndAddResource("udf2.jar", pigContext); + createAndAddResource("another.jar", pigContext); + + final JobConf jobConf = compileTestJob(pigContext, conf); + + // verifying the jar gets on distributed cache + URI[] cacheURIs = DistributedCache.getCacheFiles(jobConf); + Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf); + // expected - 1. udf.jar#udf.jar, 2. udf1.jar#diffname.jar 3. udf2.jar (same added twice) + // 4. another.jar and 5. udf1.jar, and not duplicate udf.jar + System.out.println("cache.files= " + Arrays.toString(cacheURIs)); + System.out.println("classpath.files= " + Arrays.toString(fileClassPaths)); + if (HadoopShims.isHadoopYARN()) { + // Default jars - 5 (pig, antlr, joda-time, automaton) + // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar + Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9, + Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size()); + Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9, + Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size()); + } else { + // Default jars - 5. Has guava in addition + // There will be same entries duplicated for udf.jar and udf2.jar + Assert.assertEquals("size 12 for " + Arrays.toString(cacheURIs), 12, + Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size()); + Assert.assertEquals("size 12 for " + Arrays.toString(fileClassPaths), 12, + Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size()); + } + + // Count occurrences of the resources + Map<String, Integer> occurrences = new HashMap<String, Integer>(); + + for (URI cacheURI : cacheURIs) { + Integer val = occurrences.get(cacheURI.toString()); + val = (val == null) ? 1 : ++val; + occurrences.put(cacheURI.toString(), val); + } + if (HadoopShims.isHadoopYARN()) { + Assert.assertEquals(9, occurrences.size()); + } else { + Assert.assertEquals(10, occurrences.size()); //guava jar in addition + } + + for (String file : occurrences.keySet()) { + if (!HadoopShims.isHadoopYARN() && (file.endsWith("udf.jar") || file.endsWith("udf2.jar"))) { + // Same path added twice which is ok. It should not be a shipped to hdfs temp path. + // We assert path is same by checking count + Assert.assertEquals("Two occurrences for " + file, 2, (int) occurrences.get(file)); + } else { + // check that only single occurrence even though we added once to dist cache (simulating via Oozie) + // and second time through pig register jar when there is symlink + Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file)); + } + } + } + + private File createAndAddResource(String name, PigContext pigContext) throws IOException { + File f = new File(name); + f.createNewFile(); + f.deleteOnExit(); + pigContext.addJar(name); + return f; + } + @Test public void testEstimateNumberOfReducers() throws Exception { Assert.assertEquals(2, JobControlCompiler.estimateNumberOfReducers( @@ -229,7 +322,7 @@ public class TestJobControlCompiler { } /** - * checks if the given file name is in the jar + * checks if the given file name is in the jar * @param jarFile the jar to check * @param name the name to find (full path in the jar) * @return true if the name was found Modified: pig/branches/spark/test/org/apache/pig/test/TestJoin.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestJoin.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestJoin.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestJoin.java Thu Nov 27 12:49:54 2014 @@ -87,7 +87,7 @@ public class TestJoin { fileName = fileNameHint; } else if (execType == ExecType.LOCAL) { File f = Util.createInputFile("test", fileNameHint, data); - fileName = Util.generateURI(Util.encodeEscape(f.getAbsolutePath()), pigServer.getPigContext()); + fileName = Util.generateURI(f.getAbsolutePath(), pigServer.getPigContext()); } return fileName; } @@ -733,4 +733,4 @@ public class TestJoin { } assertTrue("All expected tuples should have been found, remaining: "+expected, expected.isEmpty()); } -} \ No newline at end of file +} Modified: pig/branches/spark/test/org/apache/pig/test/TestJsonLoaderStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestJsonLoaderStorage.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestJsonLoaderStorage.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestJsonLoaderStorage.java Thu Nov 27 12:49:54 2014 @@ -111,7 +111,27 @@ public class TestJsonLoaderStorage { "\"l\":null," + "\"m\":null" + "}"; - + + private static final String bigDecimalJson = + "{" + + "\"a\":123.456," + + "\"b\":\"123.456\"" + + "}"; + + private static final String badJson = + "{" + + "\"a\":\"good\"," + + "\"b\":\"good\"" + + "}\n" + + "{" + + "\"a\":bad," + + "\"b\":\"good\"" + + "}\n" + + "{" + + "\"a\":\"good\"," + + "\"b\":\"good\"" + + "}"; + private static final String jsonOutput = "{\"f1\":\"18\",\"count\":3}"; @@ -214,6 +234,50 @@ public class TestJsonLoaderStorage { assertEquals(1, count); } + @SuppressWarnings("rawtypes") + @Test + public void testJsonLoaderBadRow() throws IOException{ + + String badJsonFile = createInput(badJson); + pigServer.registerQuery("data = load '" + badJsonFile + "' using JsonLoader('a:chararray, b:chararray');"); + Iterator<Tuple> tuples = pigServer.openIterator("data"); + + Tuple t = tuples.next(); + assertTrue(t.size()==2); + assertTrue(t.get(0)!=null); + assertTrue(t.get(1)!=null); + assertTrue(tuples.hasNext()); + + // bad row - skip it, returning a null tuple. + t = tuples.next(); + assertTrue(t.size()==2); + assertTrue(t.get(0)==null); + assertTrue(t.get(1)==null); + assertTrue(tuples.hasNext()); + + t = tuples.next(); + assertTrue(t.size()==2); + assertTrue(t.get(0)!=null); + assertTrue(t.get(1)!=null); + assertTrue(!tuples.hasNext()); + } + + @SuppressWarnings("rawtypes") + @Test + public void testJsonLoaderBigDecimalFormats() throws IOException{ + + String bigDecimalJsonFile = createInput(bigDecimalJson); + pigServer.registerQuery("data = load '" + bigDecimalJsonFile + "' using JsonLoader('a:bigdecimal, b:bigdecimal');"); + Iterator<Tuple> tuples = pigServer.openIterator("data"); + + Tuple t = tuples.next(); + assertTrue(t.size()==2); + assertTrue(t.get(0)!=null); + assertTrue(t.get(1)!=null); + assertEquals(t.get(0), t.get(1)); + assertTrue(!tuples.hasNext()); + } + @Test public void testJsonLoaderNull() throws IOException { Iterator<Tuple> tuples = loadJson(nullJson); @@ -346,7 +410,7 @@ public class TestJsonLoaderStorage { tempJsonFile.delete(); // Pig query to run - pigServer.registerQuery("IP = load '"+ Util.generateURI(Util.encodeEscape(input.toString()), pigServer.getPigContext()) + pigServer.registerQuery("IP = load '"+ Util.generateURI(input.toString(), pigServer.getPigContext()) +"' using PigStorage (';') as (ID:chararray,DETAILS:chararray);"); pigServer.registerQuery( "id_details = FOREACH IP GENERATE " + Modified: pig/branches/spark/test/org/apache/pig/test/TestLimitVariable.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLimitVariable.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestLimitVariable.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestLimitVariable.java Thu Nov 27 12:49:54 2014 @@ -20,16 +20,19 @@ package org.apache.pig.test; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.newplan.logical.visitor.ScalarVariableValidator; import org.junit.AfterClass; -import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -116,6 +119,34 @@ public class TestLimitVariable { Util.checkQueryOutputs(itE, expectedResE); } + @Test + public void testLimitVariable4() throws IOException { + String query = + "a = load '" + inputFile.getName() + "' as (x:int, y:int);" + + "b = group a all;" + + "c = foreach b generate COUNT(a) as sum;" + + "d = order a by $0 DESC;" + + "e = filter d by $0 != 4;" + + "f = limit e c.sum/2;" // return top half of the tuples + ; + + try { + HashSet<String> disabledOptimizerRules = new HashSet<String>(); + disabledOptimizerRules.add("PushUpFilter"); + pigServer.getPigContext().getProperties().setProperty(PigImplConstants.PIG_OPTIMIZER_RULES_KEY, + ObjectSerializer.serialize(disabledOptimizerRules)); + Util.registerMultiLineQuery(pigServer, query); + Iterator<Tuple> it = pigServer.openIterator("f"); + + // Even if push up filter is disabled order should be retained + List<Tuple> expectedRes = Util.getTuplesFromConstantTupleStrings(new String[] { + "(6,15)", "(5,10)", "(3,10)" }); + Util.checkQueryOutputs(it, expectedRes); + } finally { + pigServer.getPigContext().getProperties().remove(PigImplConstants.PIG_OPTIMIZER_RULES_KEY); + } + } + @Test(expected=FrontendException.class) public void testLimitVariableException1() throws Throwable { String query = @@ -179,4 +210,20 @@ public class TestLimitVariable { "(1,11)", "(2,3)", "(3,10)", "(6,15)" }); Util.checkQueryOutputsAfterSort(it, expectedRes); } + + @Test + public void testZeroLimitVariable() throws Throwable { + String query = + "a = load '" + inputFile.getName() + "';" + + "b = group a all;" + + "c = foreach b generate COUNT(a) as sum;" + + "d = limit a c.sum - c.sum; " + ; + + Util.registerMultiLineQuery(pigServer, query); + Iterator<Tuple> it = pigServer.openIterator("d"); + + List<Tuple> emptyresult = new ArrayList<Tuple>(0); + Util.checkQueryOutputsAfterSort(it, emptyresult); + } } Modified: pig/branches/spark/test/org/apache/pig/test/TestLoad.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLoad.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestLoad.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestLoad.java Thu Nov 27 12:49:54 2014 @@ -306,7 +306,7 @@ public class TestLoad { boolean[] multiquery = {true, false}; for (boolean b : multiquery) { - pc.getProperties().setProperty(PigConfiguration.OPT_MULTIQUERY, "" + b); + pc.getProperties().setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + b); DataStorage dfs = pc.getDfs(); dfs.setActiveContainer(dfs.asContainer("/tmp")); Modified: pig/branches/spark/test/org/apache/pig/test/TestLocal.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLocal.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestLocal.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestLocal.java Thu Nov 27 12:49:54 2014 @@ -100,7 +100,7 @@ public class TestLocal { public Double bigGroupAll( File tmpFile ) throws Throwable { String query = "foreach (group (load '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pig.getPigContext()) + + Util.generateURI(tmpFile.toString(), pig.getPigContext()) + "') all) generate " + COUNT.class.getName() + "($1) ;"; System.out.println(query); pig.registerQuery("asdf_id = " + query); @@ -188,7 +188,7 @@ public class TestLocal { //Load, Execute and Store query String query = "foreach (load '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pig.getPigContext()) + + Util.generateURI(tmpFile.toString(), pig.getPigContext()) + "') generate $0,$1;"; System.out.println(query); pig.registerQuery("asdf_id = " + query); @@ -239,7 +239,7 @@ public class TestLocal { // Load, Execute and Store query String query = "foreach (load '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pig.getPigContext()) + + Util.generateURI(tmpFile.toString(), pig.getPigContext()) + "') generate $0,$1;"; System.out.println(query); pig.registerQuery("asdf_id = " + query); @@ -284,7 +284,7 @@ public class TestLocal { // execute query String query = "foreach (group (load '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pig.getPigContext()) + + Util.generateURI(tmpFile.toString(), pig.getPigContext()) + "' using " + MyStorage.class.getName() + "()) by " + MyGroup.class.getName() + "('all')) generate flatten(" + MyApply.class.getName() + "($1)) ;"; @@ -323,7 +323,7 @@ public class TestLocal { // execute query String query = "foreach (group (load '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pig.getPigContext()) + + Util.generateURI(tmpFile.toString(), pig.getPigContext()) + "' using " + MyStorage.class.getName() + "()) by " + MyGroup.class.getName() + "('all')) generate flatten(" + MyApply.class.getName() + "($1)) ;"; @@ -358,7 +358,7 @@ public class TestLocal { pig.registerFunction("foo", new FuncSpec(MyApply.class.getName()+"('foo')")); String query = "foreach (group (load '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pig.getPigContext()) + + Util.generateURI(tmpFile.toString(), pig.getPigContext()) + "' using " + MyStorage.class.getName() + "()) by " + MyGroup.class.getName() + "('all')) generate flatten(foo($1)) ;"; @@ -406,7 +406,7 @@ public class TestLocal { pig.registerFunction("foo", new FuncSpec(MyApply.class.getName()+"('foo')")); String query = "foreach (group (load '" - + Util.generateURI(Util.encodeEscape(tmpFile.toString()), pig.getPigContext()) + + Util.generateURI(tmpFile.toString(), pig.getPigContext()) + "' using " + MyStorage.class.getName() + "()) by " + MyGroup.class.getName() + "('all')) generate flatten(foo($1)) ;"; @@ -515,4 +515,4 @@ public class TestLocal { return data; } -} \ No newline at end of file +} Modified: pig/branches/spark/test/org/apache/pig/test/TestLocal2.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLocal2.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestLocal2.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestLocal2.java Thu Nov 27 12:49:54 2014 @@ -54,10 +54,10 @@ public class TestLocal2 { File tmpFile1 = genDataSetFile(false, 30 ) ; File tmpFile2 = genDataSetFile(false, 50 ) ; pig.registerQuery("a = load '" - + Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pig.getPigContext()) + + Util.generateURI(tmpFile1.toString(), pig.getPigContext()) + "'; "); pig.registerQuery("b = load '" - + Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pig.getPigContext()) + + Util.generateURI(tmpFile2.toString(), pig.getPigContext()) + "'; "); pig.registerQuery("c = union a, b; ") ; @@ -70,10 +70,10 @@ public class TestLocal2 { File tmpFile1 = genDataSetFile(true, 30 ) ; File tmpFile2 = genDataSetFile(true, 50 ) ; pig.registerQuery("a = load '" - + Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pig.getPigContext()) + + Util.generateURI(tmpFile1.toString(), pig.getPigContext()) + "'; "); pig.registerQuery("b = load '" - + Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pig.getPigContext()) + + Util.generateURI(tmpFile2.toString(), pig.getPigContext()) + "'; "); pig.registerQuery("c = union a, b; ") ; @@ -86,10 +86,10 @@ public class TestLocal2 { File tmpFile1 = genDataSetFile(false, 30) ; File tmpFile2 = genDataSetFile(false, 50) ; pig.registerQuery("a = load '" - + Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pig.getPigContext()) + + Util.generateURI(tmpFile1.toString(), pig.getPigContext()) + "'; "); pig.registerQuery("b = load '" - + Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pig.getPigContext()) + + Util.generateURI(tmpFile2.toString(), pig.getPigContext()) + "'; "); pig.registerQuery("a1 = foreach a generate $0, $1; ") ; pig.registerQuery("b1 = foreach b generate $0, $1; ") ; @@ -103,10 +103,10 @@ public class TestLocal2 { File tmpFile1 = genDataSetFile(true, 30) ; File tmpFile2 = genDataSetFile(true, 50) ; pig.registerQuery("a = load '" - + Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pig.getPigContext()) + + Util.generateURI(tmpFile1.toString(), pig.getPigContext()) + "'; "); pig.registerQuery("b = load '" - + Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pig.getPigContext()) + + Util.generateURI(tmpFile2.toString(), pig.getPigContext()) + "'; "); pig.registerQuery("a1 = foreach a generate $0, $1; ") ; pig.registerQuery("b1 = foreach b generate $0, $1; ") ; @@ -125,7 +125,7 @@ public class TestLocal2 { ps.close(); pig.registerQuery("A = load '" - + Util.generateURI(Util.encodeEscape(fp1.toString()), pig.getPigContext()) + + Util.generateURI(fp1.toString(), pig.getPigContext()) + "'; "); pig.registerQuery("B = foreach A generate flatten(" + Pig800Udf.class.getName() + "($0));"); @@ -148,7 +148,7 @@ public class TestLocal2 { ps.close(); pig.registerQuery("A = load '" - + Util.generateURI(Util.encodeEscape(fp1.toString()), pig.getPigContext()) + + Util.generateURI(fp1.toString(), pig.getPigContext()) + "'; "); pig.registerQuery("B = foreach A generate flatten(" + Pig800Udf.class.getName() + "($0));"); @@ -171,7 +171,7 @@ public class TestLocal2 { ps.close(); pig.registerQuery("A = load '" - + Util.generateURI(Util.encodeEscape(fp1.toString()), pig.getPigContext()) + + Util.generateURI(fp1.toString(), pig.getPigContext()) + "' AS (c1:int, c2:int); "); pig.registerQuery("B = filter A by c1 > 0;"); pig.registerQuery("C = filter B by c1 < 2;"); @@ -207,10 +207,10 @@ public class TestLocal2 { pig.registerQuery("A = load '" - + Util.generateURI(Util.encodeEscape(fp1.toString()), pig.getPigContext()) + + Util.generateURI(fp1.toString(), pig.getPigContext()) + "'AS (a0:int, a1:int); "); pig.registerQuery("B = load '" - + Util.generateURI(Util.encodeEscape(fp2.toString()), pig.getPigContext()) + + Util.generateURI(fp2.toString(), pig.getPigContext()) + "'AS (b0:int, b1:int); "); pig.registerQuery("C = join A by a0, B by b0;"); Modified: pig/branches/spark/test/org/apache/pig/test/TestMRJobStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMRJobStats.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestMRJobStats.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestMRJobStats.java Thu Nov 27 12:49:54 2014 @@ -27,6 +27,8 @@ import java.io.PrintWriter; import java.io.RandomAccessFile; import java.lang.reflect.Constructor; import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Properties; @@ -126,8 +128,8 @@ public class TestMRJobStats { getJobStatsMethod("setId", JobID.class).invoke(jobStats, jobID); jobStats.setSuccessful(true); - getJobStatsMethod("addMapReduceStatistics", TaskReport[].class, TaskReport[].class) - .invoke(jobStats, mapTaskReports, reduceTaskReports); + getJobStatsMethod("addMapReduceStatistics", Iterator.class, Iterator.class) + .invoke(jobStats, Arrays.asList(mapTaskReports).iterator(), Arrays.asList(reduceTaskReports).iterator()); String msg = (String)getJobStatsMethod("getDisplayString") .invoke(jobStats); @@ -156,8 +158,8 @@ public class TestMRJobStats { getJobStatsMethod("setId", JobID.class).invoke(jobStats, jobID); jobStats.setSuccessful(true); - getJobStatsMethod("addMapReduceStatistics", TaskReport[].class, TaskReport[].class) - .invoke(jobStats, mapTaskReports, reduceTaskReports); + getJobStatsMethod("addMapReduceStatistics", Iterator.class, Iterator.class) + .invoke(jobStats, Arrays.asList(mapTaskReports).iterator(), Arrays.asList(reduceTaskReports).iterator()); String msg = (String)getJobStatsMethod("getDisplayString") .invoke(jobStats); System.out.println(JobStats.SUCCESS_HEADER); Modified: pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java Thu Nov 27 12:49:54 2014 @@ -479,7 +479,7 @@ public class TestMapSideCogroup { @Override public void initialize(Configuration conf) throws IOException { - is = FileSystem.get(conf).open(new Path(loc)); + is = FileSystem.get(new Path(loc).toUri(), conf).open(new Path(loc)); } @Override Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java Thu Nov 27 12:49:54 2014 @@ -52,7 +52,7 @@ public class TestMultiQuery { Util.copyFromLocalToLocal( "test/org/apache/pig/test/data/passwd2", "passwd2"); Properties props = new Properties(); - props.setProperty(PigConfiguration.OPT_MULTIQUERY, ""+true); + props.setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, ""+true); myPig = new PigServer(ExecType.LOCAL, props); } @@ -813,6 +813,32 @@ public class TestMultiQuery { } } + @Test + public void testMultiQueryJiraPig4170() throws Exception { + + Storage.Data data = Storage.resetData(myPig); + data.set("inputLocation", Storage.tuple(1, "hello"), Storage.tuple(2, "world")); + + myPig.setBatchOn(); + myPig.registerQuery("A = load 'inputLocation' using mock.Storage() as (a:int, b:chararray);"); + myPig.registerQuery("A1 = group A by a;"); + myPig.registerQuery("A2 = group A by b;"); + myPig.registerQuery("store A1 into 'output1' using mock.Storage();"); + myPig.registerQuery("store A2 into 'output2' using mock.Storage();"); + + myPig.executeBatch(); + + myPig.registerQuery("A = load 'output1' using mock.Storage() as (a:int, c:bag{(i:int, s:chararray)});"); + Iterator<Tuple> iter = myPig.openIterator("A"); + iter.next().toString().equals("(1,{(1,hello)})"); + iter.next().toString().equals("(2,{(2,world)})"); + + myPig.registerQuery("A = load 'output2' using mock.Storage() as (b:chararray, c:bag{(i:int, s:chararray)});"); + iter = myPig.openIterator("A"); + iter.next().toString().equals("(hello,{(1,hello)})"); + iter.next().toString().equals("(world,{(2,world)})"); + } + // -------------------------------------------------------------------------- // Helper methods Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQueryBasic.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQueryBasic.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestMultiQueryBasic.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestMultiQueryBasic.java Thu Nov 27 12:49:54 2014 @@ -69,7 +69,7 @@ public class TestMultiQueryBasic { Util.copyFromLocalToLocal( "test/org/apache/pig/test/data/passwd2", "passwd2"); Properties props = new Properties(); - props.setProperty(PigConfiguration.OPT_MULTIQUERY, ""+true); + props.setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, ""+true); myPig = new PigServer(ExecType.LOCAL, props); } Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java Thu Nov 27 12:49:54 2014 @@ -82,7 +82,7 @@ public class TestMultiQueryCompiler { @Before public void setUp() throws Exception { - cluster.setProperty(PigConfiguration.OPT_MULTIQUERY, ""+true); + cluster.setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, ""+true); myPig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); deleteOutputFiles(); } Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQueryLocal.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestMultiQueryLocal.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestMultiQueryLocal.java Thu Nov 27 12:49:54 2014 @@ -67,7 +67,7 @@ public class TestMultiQueryLocal { @Before public void setUp() throws Exception { PigContext context = new PigContext(ExecType.LOCAL, new Properties()); - context.getProperties().setProperty(PigConfiguration.OPT_MULTIQUERY, ""+true); + context.getProperties().setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, ""+true); myPig = new PigServer(context); myPig.getPigContext().getProperties().setProperty("pig.usenewlogicalplan", "false"); myPig.getPigContext().getProperties().setProperty(PigConfiguration.PIG_TEMP_DIR, "build/test/tmp/"); Modified: pig/branches/spark/test/org/apache/pig/test/TestOrderBy.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestOrderBy.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestOrderBy.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestOrderBy.java Thu Nov 27 12:49:54 2014 @@ -65,8 +65,7 @@ public class TestOrderBy { ps.println("1\t" + DATA[1][i] + "\t" + DATA[0][i]); } ps.close(); - - DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.UTC.getOffset(null))); + } @After Modified: pig/branches/spark/test/org/apache/pig/test/TestPOPartialAgg.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPOPartialAgg.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPOPartialAgg.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPOPartialAgg.java Thu Nov 27 12:49:54 2014 @@ -17,10 +17,18 @@ */ package org.apache.pig.test; -import static org.junit.Assert.*; +import static org.apache.pig.builtin.mock.Storage.tuple; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.pig.FuncSpec; @@ -38,9 +46,11 @@ import org.apache.pig.builtin.IntSum; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.plan.PlanException; +import org.apache.pig.impl.util.Spillable; import org.apache.pig.parser.ParserException; import org.apache.pig.test.utils.GenPhyOp; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; @@ -50,12 +60,19 @@ import com.google.common.base.Strings; * Test POPartialAgg runtime */ public class TestPOPartialAgg { - POPartialAgg partAggOp; - PhysicalPlan parentPlan; - Tuple dummyTuple = null; + + private static ExecutorService executor = Executors.newSingleThreadExecutor(); + private POPartialAgg partAggOp; + private PhysicalPlan parentPlan; + + @AfterClass + public static void oneTimeTearDown() { + executor.shutdownNow(); + } @Before public void setUp() throws Exception { + PigMapReduce.sJobConfInternal.set(new Configuration()); createPOPartialPlan(1); } @@ -73,14 +90,14 @@ public class TestPOPartialAgg { // setup value plans List<PhysicalPlan> valuePlans = new ArrayList<PhysicalPlan>(); - + for (int i = 0; i < valueCount; i++) { // project arg for udf PhysicalPlan valPlan = new PhysicalPlan(); POProject projVal1 = new POProject(GenPhyOp.getOK(), -1, i + 1); projVal1.setResultType(DataType.BAG); valPlan.add(projVal1); - + // setup udf List<PhysicalOperator> udfInps = new ArrayList<PhysicalOperator>(); udfInps.add(projVal1); @@ -89,7 +106,7 @@ public class TestPOPartialAgg { sumSpec); valPlan.add(sumUdf); valPlan.connect(projVal1, sumUdf); - + valuePlans.add(valPlan); } @@ -225,7 +242,7 @@ public class TestPOPartialAgg { public void testMultiVals() throws Exception { // more than one value to be aggregated createPOPartialPlan(2); - + // input tuple has key, and bag containing SUM.Init output String[] inputTups = { "(1,(1L),(2L))", "(2,(2L),(1L))", "(1,(2L),(2L))" }; String[] outputTups = { "(1,(3L),(4L))", "(2,(2L),(1L))" }; @@ -234,21 +251,21 @@ public class TestPOPartialAgg { @Test public void testMultiValCheckNotDisabled() throws Exception { - // "large" number of values per input to aggregate but good reduction - // in size due to aggregation. - // This case should result in a reduction from 10500 inputs to 500 - // outputs (factor of 20), so in-memory aggregation should not be - // disabled in checkSize(). If it is disabled, too many output rows + // "large" number of values per input to aggregate but good reduction + // in size due to aggregation. + // This case should result in a reduction from 10500 inputs to 500 + // outputs (factor of 20), so in-memory aggregation should not be + // disabled in checkSize(). If it is disabled, too many output rows // will be generated. - + int numKeys = 500; int numVals = 3; createPOPartialPlan(numVals); - + // Build a string of values to use in all input tuples String vals = Strings.repeat(",(1L)", numVals); - + // And input tuples. // We need the next multiple of numKeys over 10,000 because we need to // trigger the size check (at 10,000), and we want an even multiple of @@ -258,7 +275,7 @@ public class TestPOPartialAgg { for (int i = 0; i < numInputs; i++) { inputTups[i] = "(" + (i % numKeys) + vals + ")"; } - + // Build expected results int expectedVal = numInputs / numKeys; vals = Strings.repeat(",(" + expectedVal + "L)", numVals); @@ -266,17 +283,100 @@ public class TestPOPartialAgg { for (int i = 0; i < numKeys; i++) { outputTups[i] = "(" + i + vals + ")"; } - + // input tuple has key, and bag containing SUM.Init output checkInputAndOutput(inputTups, outputTups, false); } - - + + @Test + public void testMemorySpill1() throws Exception { + // Test spill which only does aggregation + Result res; + for (long i=1; i <= 15; i ++) { + Tuple t = tuple(1, tuple(i)); + partAggOp.attachInput(t); + res = partAggOp.getNextTuple(); + assertEquals(POStatus.STATUS_EOP, res.returnStatus); + } + Future<Long> spilled = executor.submit(new Spill(partAggOp)); + Thread.sleep(100); + partAggOp.attachInput(tuple(2, tuple(-1L))); + assertFalse(spilled.isDone()); + res = partAggOp.getNextTuple(); + // Since it was aggregated there should be no records emitted + assertEquals(POStatus.STATUS_EOP, res.returnStatus); + Thread.sleep(100); + assertTrue(spilled.isDone()); + assertEquals(new Long(1), spilled.get()); + + List<Tuple> expectedValues = new ArrayList<Tuple>(); + expectedValues.add(tuple(1, tuple(120L))); //aggregated result + expectedValues.add(tuple(2, tuple(-1L))); + // end of all input, now expecting all tuples + parentPlan.endOfAllInput = true; + res = partAggOp.getNextTuple(); + do { + assertEquals(POStatus.STATUS_OK, res.returnStatus); + assertTrue(expectedValues.remove(res.result)); + res = partAggOp.getNextTuple(); + } while (res.returnStatus != POStatus.STATUS_EOP); + assertTrue(expectedValues.isEmpty()); + } + + @Test + public void testMemorySpill2() throws Exception { + // Test spill which emits records as aggregation does not meet secondary tier threshold + Result res = null; + List<Tuple> expectedValues = new ArrayList<Tuple>(); + //POPartialAgg.SECOND_TIER_THRESHOLD evaluates to 2000 by default + for (long i=1; i <= 2001; i ++) { + Tuple t = tuple(i, tuple(i)); + expectedValues.add(t); + partAggOp.attachInput(t); + res = partAggOp.getNextTuple(); + assertEquals(POStatus.STATUS_EOP, res.returnStatus); + } + Future<Long> spilled = executor.submit(new Spill(partAggOp)); + Thread.sleep(100); + partAggOp.attachInput(tuple(2, tuple(-1L))); + expectedValues.add(tuple(2, tuple(-1L))); + + long i = 0; + res = partAggOp.getNextTuple(); + do { + assertFalse(spilled.isDone()); + assertEquals(POStatus.STATUS_OK, res.returnStatus); + assertTrue(expectedValues.remove(res.result)); + i++; + res = partAggOp.getNextTuple(); + } while (res.returnStatus != POStatus.STATUS_EOP); + assertEquals(2002, i); + assertTrue(expectedValues.isEmpty()); + Thread.sleep(100); + assertTrue(spilled.isDone()); + assertEquals(new Long(1), spilled.get()); + } + + private static class Spill implements Callable<Long> { + + private Spillable spillable; + + public Spill(Spillable spillable) { + this.spillable = spillable; + } + + @Override + public Long call() throws Exception { + return spillable.spill(); + } + + } + /** * run the plan on inputTups and check if output matches outputTups if * isMapMemEmpty is set to true, set memory available for the hash-map to * zero - * + * * @param inputTups * @param outputTups * @param isMapMemEmpty @@ -287,9 +387,8 @@ public class TestPOPartialAgg { private void checkInputAndOutput(String[] inputTups, String[] outputTups, boolean isMapMemEmpty) throws Exception { - PigMapReduce.sJobConfInternal.set(new Configuration()); if (isMapMemEmpty) { - PigMapReduce.sJobConfInternal.get().set(PigConfiguration.PROP_CACHEDBAG_MEMUSAGE, + PigMapReduce.sJobConfInternal.get().set(PigConfiguration.PIG_CACHEDBAG_MEMUSAGE, "0"); } @@ -324,7 +423,7 @@ public class TestPOPartialAgg { res = partAggOp.getNextTuple(); assertEquals(POStatus.STATUS_EOP, res.returnStatus); - Util.compareActualAndExpectedResults(outputs, expectedOuts); + Util.checkQueryOutputsAfterSort(outputs, expectedOuts); } else { while (true) { Result res = partAggOp.getNextTuple(); @@ -332,7 +431,7 @@ public class TestPOPartialAgg { break; } } - Util.compareActualAndExpectedResults(outputs, expectedOuts); + Util.checkQueryOutputsAfterSort(outputs, expectedOuts); } } Modified: pig/branches/spark/test/org/apache/pig/test/TestPOPartialAggPlan.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPOPartialAggPlan.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPOPartialAggPlan.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPOPartialAggPlan.java Thu Nov 27 12:49:54 2014 @@ -64,7 +64,7 @@ public class TestPOPartialAggPlan { public void testMapAggPropFalse() throws Exception{ //test with pig.exec.mapPartAgg set to false String query = getGByQuery(); - pc.getProperties().setProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG, "false"); + pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, "false"); MROperPlan mrp = Util.buildMRPlan(query, pc); assertEquals(mrp.size(), 1); @@ -75,7 +75,7 @@ public class TestPOPartialAggPlan { public void testMapAggPropTrue() throws Exception{ //test with pig.exec.mapPartAgg to true String query = getGByQuery(); - pc.getProperties().setProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG, "true"); + pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, "true"); MROperPlan mrp = Util.buildMRPlan(query, pc); assertEquals(mrp.size(), 1); @@ -102,7 +102,7 @@ public class TestPOPartialAggPlan { String query = "l = load 'x' as (a,b,c);" + "g = group l by a;" + "f = foreach g generate group;"; - pc.getProperties().setProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG, "true"); + pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, "true"); MROperPlan mrp = Util.buildMRPlan(query, pc); assertEquals(mrp.size(), 1); @@ -115,7 +115,7 @@ public class TestPOPartialAggPlan { String query = "l = load 'x' as (a,b,c);" + "g = group l by a;" + "f = foreach g generate group, COUNT(l.b), l.b;"; - pc.getProperties().setProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG, "true"); + pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, "true"); MROperPlan mrp = Util.buildMRPlan(query, pc); assertEquals(mrp.size(), 1);
