Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java Fri Mar 4 18:17:39 2016 @@ -28,6 +28,7 @@ import java.util.Properties; import org.apache.pig.PigConfiguration; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecJob; +import org.apache.pig.builtin.mock.Storage; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.schema.Schema; @@ -38,7 +39,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.apache.pig.builtin.mock.Storage; @RunWith(JUnit4.class) public class TestMultiQuery { @@ -811,6 +811,75 @@ public class TestMultiQuery { iter.next().toString().equals("(world,{(2,world)})"); } + @Test + public void testMultiQueryJiraPig4480() throws Exception { + + Storage.Data data = Storage.resetData(myPig); + data.set("inputLocation", + Storage.tuple(1, Storage.bag(Storage.tuple("hello"), Storage.tuple("world"), Storage.tuple("program"))), + Storage.tuple(2, Storage.bag(Storage.tuple("my"), Storage.tuple("world")))); + + myPig.setBatchOn(); + myPig.registerQuery("A = load 'inputLocation' using mock.Storage() as (a:int, b:bag{(c:chararray)});"); + myPig.registerQuery("A = foreach A generate a, flatten(b);"); + myPig.registerQuery("A1 = foreach A generate a;"); + myPig.registerQuery("A1 = distinct A1;"); + myPig.registerQuery("A2 = filter A by c == 'world';"); + myPig.registerQuery("A2 = ORDER A2 by a parallel 2;"); + myPig.registerQuery("store A1 into 'output1' using mock.Storage();"); + myPig.registerQuery("store A2 into 'output2' using mock.Storage();"); + + myPig.executeBatch(); + + List<Tuple> actualResults = data.get("output1"); + List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[] {"(1)", "(2)"}); + Util.checkQueryOutputs(actualResults.iterator(), expectedResults); + + actualResults = data.get("output2"); + expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[] {"(1, 'world')", "(2, 'world')"}); + Util.checkQueryOutputs(actualResults.iterator(), expectedResults); + } + + @Test + public void testMultiQueryJiraPig4493() throws Exception { + + // Union followed by Split + Storage.Data data = Storage.resetData(myPig); + data.set("inputLocation", + Storage.tuple("1", "Dyson"), + Storage.tuple("2", "Miele"), + Storage.tuple("3", "Black & Decker") + ); + + myPig.setBatchOn(); + myPig.registerQuery("A = load 'inputLocation' using mock.Storage();"); + myPig.registerQuery("A = foreach A generate (int)$0 as a, (chararray)$1 as b;"); + myPig.registerQuery("A1 = FILTER A by b matches '.*[a-zA-Z] *& *[a-zA-Z].*';"); + myPig.registerQuery("A1 = FOREACH A1 generate a, REPLACE(b,'&','and') as b;"); + myPig.registerQuery("A = UNION A1, A;"); + myPig.registerQuery("A = FOREACH A generate a, LOWER(b) as b;"); + myPig.registerQuery("A2 = GROUP A by a;"); + myPig.registerQuery("A2 = FOREACH A2 generate group, COUNT(A) as cnt;"); + myPig.registerQuery("store A2 into 'output1' using mock.Storage();"); + myPig.registerQuery("A = FILTER A BY b is not null and b != '';"); + myPig.registerQuery("store A into 'output2' using mock.Storage();"); + + myPig.executeBatch(); + + List<Tuple> actualResults = data.get("output1"); + List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[] {"(1, 1L)", "(2, 1L)", "(3, 2L)"}); + Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults); + + actualResults = data.get("output2"); + expectedResults = Util.getTuplesFromConstantTupleStrings(new String[] { + "(1, 'dyson')", "(2, 'miele')", "(3, 'black & decker')", + "(3, 'black and decker')" }); + Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults); + } + // -------------------------------------------------------------------------- // Helper methods
Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQueryLocal.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestMultiQueryLocal.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestMultiQueryLocal.java Fri Mar 4 18:17:39 2016 @@ -27,8 +27,6 @@ import java.util.Collections; import java.util.Iterator; import java.util.Properties; -import junit.framework.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -58,6 +56,7 @@ import org.apache.pig.tools.pigscript.pa import org.apache.pig.tools.pigstats.JobStats; import org.apache.pig.tools.pigstats.PigStats; import org.junit.After; +import org.junit.Assert; import org.junit.Assume; import org.junit.Before; import org.junit.Test; @@ -84,7 +83,7 @@ public class TestMultiQueryLocal { } @Test - public void testMultiQueryWithTwoStores() { + public void testMultiQueryWithTwoStores() throws Exception { System.out.println("===== test multi-query with 2 stores ====="); @@ -106,32 +105,23 @@ public class TestMultiQueryLocal { Assert.assertTrue(executePlan(pp)); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); } finally { deleteOutputFiles(); } } @Test - public void testEmptyExecute() { + public void testEmptyExecute() throws Exception { System.out.println("=== test empty execute ==="); - try { - myPig.setBatchOn(); - myPig.executeBatch(); - myPig.executeBatch(); - myPig.discardBatch(); - } - catch (Exception e) { - e.printStackTrace(); - Assert.fail(); - } + myPig.setBatchOn(); + myPig.executeBatch(); + myPig.executeBatch(); + myPig.discardBatch(); } @Test - public void testMultiQueryWithTwoStores2() { + public void testMultiQueryWithTwoStores2() throws Exception { System.out.println("===== test multi-query with 2 stores (2) ====="); @@ -147,16 +137,13 @@ public class TestMultiQueryLocal { myPig.executeBatch(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); } finally { deleteOutputFiles(); } } @Test - public void testMultiQueryWithTwoStores2Execs() { + public void testMultiQueryWithTwoStores2Execs() throws Exception { System.out.println("===== test multi-query with 2 stores (2) ====="); @@ -175,16 +162,13 @@ public class TestMultiQueryLocal { myPig.executeBatch(); myPig.discardBatch(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); } finally { deleteOutputFiles(); } } @Test - public void testMultiQueryWithThreeStores() { + public void testMultiQueryWithThreeStores() throws Exception { System.out.println("===== test multi-query with 3 stores ====="); @@ -206,16 +190,13 @@ public class TestMultiQueryLocal { Assert.assertTrue(executePlan(pp)); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); } finally { deleteOutputFiles(); } } @Test - public void testMultiQueryWithThreeStores2() { + public void testMultiQueryWithThreeStores2() throws Exception { System.out.println("===== test multi-query with 3 stores (2) ====="); @@ -234,16 +215,13 @@ public class TestMultiQueryLocal { myPig.executeBatch(); myPig.discardBatch(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); } finally { deleteOutputFiles(); } } @Test - public void testMultiQueryWithTwoLoads() { + public void testMultiQueryWithTwoLoads() throws Exception { System.out.println("===== test multi-query with two loads ====="); @@ -268,16 +246,13 @@ public class TestMultiQueryLocal { Assert.assertTrue(executePlan(pp)); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); } finally { deleteOutputFiles(); } } @Test - public void testMultiQueryWithTwoLoads2() { + public void testMultiQueryWithTwoLoads2() throws Exception { System.out.println("===== test multi-query with two loads (2) ====="); @@ -298,58 +273,45 @@ public class TestMultiQueryLocal { myPig.executeBatch(); myPig.discardBatch(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); } finally { deleteOutputFiles(); } } @Test - public void testMultiQueryWithNoStore() { + public void testMultiQueryWithNoStore() throws Exception { System.out.println("===== test multi-query with no store ====="); - try { - myPig.setBatchOn(); + myPig.setBatchOn(); - myPig.registerQuery("a = load 'test/org/apache/pig/test/data/passwd' " + - "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); - myPig.registerQuery("b = filter a by uid > 5;"); - myPig.registerQuery("group b by gid;"); + myPig.registerQuery("a = load 'test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); + myPig.registerQuery("b = filter a by uid > 5;"); + myPig.registerQuery("group b by gid;"); - LogicalPlan lp = checkLogicalPlan(0, 0, 0); + LogicalPlan lp = checkLogicalPlan(0, 0, 0); - // XXX Physical plan has one less node in the local case - PhysicalPlan pp = checkPhysicalPlan(lp, 0, 0, 0); + // XXX Physical plan has one less node in the local case + checkPhysicalPlan(lp, 0, 0, 0); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); - } } @Test - public void testMultiQueryWithNoStore2() { + public void testMultiQueryWithNoStore2() throws Exception { System.out.println("===== test multi-query with no store (2) ====="); - try { - myPig.setBatchOn(); + myPig.setBatchOn(); - myPig.registerQuery("a = load 'test/org/apache/pig/test/data/passwd' " + - "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); - myPig.registerQuery("b = filter a by uid > 5;"); - myPig.registerQuery("group b by gid;"); + myPig.registerQuery("a = load 'test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); + myPig.registerQuery("b = filter a by uid > 5;"); + myPig.registerQuery("group b by gid;"); - myPig.executeBatch(); - myPig.discardBatch(); + myPig.executeBatch(); + myPig.discardBatch(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); - } } public static class PigStorageWithConfig extends PigStorage { @@ -428,43 +390,38 @@ public class TestMultiQueryLocal { // See PIG-2912 @Test - public void testMultiStoreWithConfig() { + public void testMultiStoreWithConfig() throws Exception { System.out.println("===== test multi-query with competing config ====="); - try { - myPig.setBatchOn(); + myPig.setBatchOn(); - myPig.registerQuery("a = load 'test/org/apache/pig/test/data/passwd' " + - "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); - myPig.registerQuery("b = filter a by uid < 5;"); - myPig.registerQuery("c = filter a by uid > 5;"); - myPig.registerQuery("store b into '" + TMP_DIR + "/Pig-TestMultiQueryLocal1' using " + PigStorageWithConfig.class.getName() + "('test.key1', 'a');"); - myPig.registerQuery("store c into '" + TMP_DIR + "/Pig-TestMultiQueryLocal2' using " + PigStorageWithConfig.class.getName() + "('test.key2', 'b');"); + myPig.registerQuery("a = load 'test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); + myPig.registerQuery("b = filter a by uid < 5;"); + myPig.registerQuery("c = filter a by uid > 5;"); + myPig.registerQuery("store b into '" + TMP_DIR + "/Pig-TestMultiQueryLocal1' using " + PigStorageWithConfig.class.getName() + "('test.key1', 'a');"); + myPig.registerQuery("store c into '" + TMP_DIR + "/Pig-TestMultiQueryLocal2' using " + PigStorageWithConfig.class.getName() + "('test.key2', 'b');"); - myPig.executeBatch(); - myPig.discardBatch(); - FileSystem fs = FileSystem.getLocal(new Configuration()); - BufferedReader reader = new BufferedReader(new InputStreamReader - (fs.open(Util.getFirstPartFile(new Path(TMP_DIR + "/Pig-TestMultiQueryLocal1"))))); - String line; - while ((line = reader.readLine())!=null) { - Assert.assertTrue(line.endsWith("a")); - } - reader = new BufferedReader(new InputStreamReader - (fs.open(Util.getFirstPartFile(new Path(TMP_DIR + "/Pig-TestMultiQueryLocal2"))))); - while ((line = reader.readLine())!=null) { - Assert.assertTrue(line.endsWith("b")); - } - - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); + myPig.executeBatch(); + myPig.discardBatch(); + FileSystem fs = FileSystem.getLocal(new Configuration()); + BufferedReader reader = new BufferedReader(new InputStreamReader + (fs.open(Util.getFirstPartFile(new Path("file:///" + TMP_DIR + "/Pig-TestMultiQueryLocal1"))))); + String line; + while ((line = reader.readLine())!=null) { + Assert.assertTrue(line.endsWith("a")); + } + reader = new BufferedReader(new InputStreamReader + (fs.open(Util.getFirstPartFile(new Path("file:///" + TMP_DIR + "/Pig-TestMultiQueryLocal2"))))); + while ((line = reader.readLine())!=null) { + Assert.assertTrue(line.endsWith("b")); } + } @Test - public void testMultiQueryWithExplain() { + public void testMultiQueryWithExplain() throws Exception { System.out.println("===== test multi-query with explain ====="); @@ -479,16 +436,13 @@ public class TestMultiQueryLocal { parser.setInteractive(false); parser.parseStopOnError(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); } finally { deleteOutputFiles(); } } @Test - public void testMultiQueryWithDump() { + public void testMultiQueryWithDump() throws Exception { System.out.println("===== test multi-query with dump ====="); @@ -503,16 +457,13 @@ public class TestMultiQueryLocal { parser.setInteractive(false); parser.parseStopOnError(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); } finally { deleteOutputFiles(); } } @Test - public void testMultiQueryWithDescribe() { + public void testMultiQueryWithDescribe() throws Exception { System.out.println("===== test multi-query with describe ====="); @@ -527,9 +478,6 @@ public class TestMultiQueryLocal { parser.setInteractive(false); parser.parseStopOnError(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); } finally { deleteOutputFiles(); } @@ -554,9 +502,6 @@ public class TestMultiQueryLocal { myPig.getPigContext().getProperties().setProperty("pig.usenewlogicalplan", "true"); parser.parseStopOnError(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); } finally { deleteOutputFiles(); myPig.getPigContext().getProperties().setProperty("pig.usenewlogicalplan", "false"); @@ -564,7 +509,7 @@ public class TestMultiQueryLocal { } @Test - public void testStoreOrder() { + public void testStoreOrder() throws Exception { System.out.println("===== multi-query store order ====="); try { @@ -583,7 +528,7 @@ public class TestMultiQueryLocal { myPig.registerQuery("store c into '" + TMP_DIR + "/Pig-TestMultiQueryLocal5';"); LogicalPlan lp = checkLogicalPlan(1, 3, 12); - PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 15); + checkPhysicalPlan(lp, 1, 3, 15); myPig.executeBatch(); myPig.discardBatch(); @@ -594,10 +539,6 @@ public class TestMultiQueryLocal { Assert.assertTrue(new File(TMP_DIR + "/Pig-TestMultiQueryLocal4").exists()); Assert.assertTrue(new File(TMP_DIR + "/Pig-TestMultiQueryLocal5").exists()); - - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); } finally { deleteOutputFiles(); } @@ -657,7 +598,7 @@ public class TestMultiQueryLocal { int expectedLeaves, int expectedSize) throws IOException { lp.optimize(myPig.getPigContext()); - System.out.println("===== check physical plan ====="); + System.out.println("===== check physical plan ====="); PhysicalPlan pp = ((HExecutionEngine)myPig.getPigContext().getExecutionEngine()).compile( lp, null); Modified: pig/branches/spark/test/org/apache/pig/test/TestPOCast.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPOCast.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPOCast.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPOCast.java Fri Mar 4 18:17:39 2016 @@ -1777,12 +1777,11 @@ public class TestPOCast { plan.attachInput(t); DataByteArray dba = (DataByteArray) t.get(0); Result res = op.getNextDataByteArray(); - assertEquals(POStatus.STATUS_ERR, res.returnStatus); + assertEquals(POStatus.STATUS_OK, res.returnStatus); planToTestBACasts.attachInput(t); res = opWithInputTypeAsBA.getNextDataByteArray(); - if(res.returnStatus == POStatus.STATUS_OK) - assertEquals(POStatus.STATUS_ERR, res.returnStatus); + assertEquals(POStatus.STATUS_OK, res.returnStatus); } { Modified: pig/branches/spark/test/org/apache/pig/test/TestPOPartialAgg.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPOPartialAgg.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPOPartialAgg.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPOPartialAgg.java Fri Mar 4 18:17:39 2016 @@ -77,8 +77,12 @@ public class TestPOPartialAgg { } private void createPOPartialPlan(int valueCount) throws PlanException { + createPOPartialPlan(valueCount, false); + } + + private void createPOPartialPlan(int valueCount, boolean isGroupAll) throws PlanException { parentPlan = new PhysicalPlan(); - partAggOp = GenPhyOp.topPOPartialAgg(); + partAggOp = new POPartialAgg(GenPhyOp.getOK(), isGroupAll); partAggOp.setParentPlan(parentPlan); // setup key plan @@ -357,6 +361,25 @@ public class TestPOPartialAgg { assertEquals(new Long(1), spilled.get()); } + @Test + public void testGroupAll() throws Exception { + createPOPartialPlan(1, true); + Result res; + for (long i=1; i <= 10010; i ++) { + Tuple t = tuple("all", tuple(i)); + partAggOp.attachInput(t); + res = partAggOp.getNextTuple(); + assertEquals(POStatus.STATUS_EOP, res.returnStatus); + } + // end of all input, now expecting all tuples + parentPlan.endOfAllInput = true; + res = partAggOp.getNextTuple(); + assertEquals(tuple("all", tuple(50105055L)), res.result); + assertEquals(POStatus.STATUS_OK, res.returnStatus); + res = partAggOp.getNextTuple(); + assertEquals(POStatus.STATUS_EOP, res.returnStatus); + } + private static class Spill implements Callable<Long> { private Spillable spillable; Modified: pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java Fri Mar 4 18:17:39 2016 @@ -26,6 +26,9 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -423,6 +426,89 @@ public class TestPigRunner { } @Test + public void simpleMultiQueryTest3() throws Exception { + final String INPUT_FILE_2 = "input2"; + final String OUTPUT_FILE_2 = "output2"; + + PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE_2)); + w.println("3\t4\t5"); + w.println("5\t6\t7"); + w.println("3\t7\t8"); + w.close(); + Util.copyFromLocalToCluster(cluster, INPUT_FILE_2, INPUT_FILE_2); + new File(INPUT_FILE_2).delete(); + + w = new PrintWriter(new FileWriter(PIG_FILE)); + w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);"); + w.println("A1 = load '" + INPUT_FILE_2 + "' as (a0:int, a1:int, a2:int);"); + w.println("B = filter A by a0 == 3;"); + w.println("C = filter A by a1 <=5;"); + w.println("D = join C by a0, B by a0, A1 by a0 using 'replicated';"); + w.println("store C into '" + OUTPUT_FILE + "';"); + w.println("store D into '" + OUTPUT_FILE_2 + "';"); + w.close(); + + try { + String[] args = { "-x", execType, PIG_FILE }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); + assertTrue(stats.isSuccessful()); + if (Util.isMapredExecType(cluster.getExecType())) { + assertEquals(3, stats.getJobGraph().size()); + } else { + assertEquals(1, stats.getJobGraph().size()); + } + + // Each output file should include the following: + // output: + // 1\t2\t3\n + // 5\t3\t4\n + // 3\t4\t5\n + // output2: + // 3\t4\t5\t3\t4\t5\t3\t4\t5\n + // 3\t4\t5\t3\t4\t5\t3\t7\t8\n + // 3\t4\t5\t3\t7\t8\t3\t4\t5\n + // 3\t4\t5\t3\t4\t5\t3\t7\t8\n + final int numOfRecords1 = 3; + final int numOfRecords2 = 4; + final int numOfBytesWritten1 = 18; + final int numOfBytesWritten2 = 72; + + assertEquals(numOfRecords1 + numOfRecords2, stats.getRecordWritten()); + assertEquals(numOfBytesWritten1 + numOfBytesWritten2, stats.getBytesWritten()); + + List<String> outputNames = new ArrayList<String>(stats.getOutputNames()); + assertTrue(outputNames.size() == 2); + Collections.sort(outputNames); + assertEquals(OUTPUT_FILE, outputNames.get(0)); + assertEquals(OUTPUT_FILE_2, outputNames.get(1)); + assertEquals(3, stats.getNumberRecords(OUTPUT_FILE)); + assertEquals(4, stats.getNumberRecords(OUTPUT_FILE_2)); + + List<InputStats> inputStats = new ArrayList<InputStats>(stats.getInputStats()); + assertTrue(inputStats.size() == 2); + Collections.sort(inputStats, new Comparator<InputStats>() { + @Override + public int compare(InputStats o1, InputStats o2) { + return o1.getLocation().compareTo(o2.getLocation()); + } + }); + assertEquals(5, inputStats.get(0).getNumberRecords()); + assertEquals(3, inputStats.get(1).getNumberRecords()); + // For mapreduce, since hdfs bytes read includes replicated tables bytes read is wrong + // Since Tez does has only one load per job its values are correct + if (!Util.isMapredExecType(cluster.getExecType())) { + assertEquals(30, inputStats.get(0).getBytes()); + assertEquals(18, inputStats.get(1).getBytes()); + } + } finally { + new File(PIG_FILE).delete(); + Util.deleteFile(cluster, INPUT_FILE_2); + Util.deleteFile(cluster, OUTPUT_FILE); + Util.deleteFile(cluster, OUTPUT_FILE_2); + } + } + + @Test public void MQDepJobFailedTest() throws Exception { final String OUTPUT_FILE_2 = "output2"; PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); @@ -974,7 +1060,7 @@ public class TestPigRunner { "OUTPUT_RECORDS").getValue()); assertEquals(20,counter.getGroup(FS_COUNTER_GROUP).getCounterForName( MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue()); - assertEquals(30,counter.getGroup(FS_COUNTER_GROUP).getCounterForName( + assertEquals(new File(INPUT_FILE).length(),counter.getGroup(FS_COUNTER_GROUP).getCounterForName( MRPigStatsUtil.HDFS_BYTES_READ).getValue()); } else if (execType.equals("spark")) { /** Uncomment code until changes of PIG-4788 are merged to master Modified: pig/branches/spark/test/org/apache/pig/test/TestPigServerLocal.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigServerLocal.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPigServerLocal.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPigServerLocal.java Fri Mar 4 18:17:39 2016 @@ -50,8 +50,10 @@ import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.PropertiesUtil; +import org.apache.pig.scripting.ScriptEngine; import org.apache.pig.tools.grunt.Grunt; import org.apache.pig.tools.grunt.GruntParser; +import org.apache.pig.tools.pigstats.PigStats; import org.junit.Before; import org.junit.Test; @@ -249,13 +251,10 @@ public class TestPigServerLocal { @Test public void testSkipParseInRegisterForBatch() throws Throwable { - // numTimesInitiated = 10. 4 (once per registerQuery) + 6 (launchPlan->RandomSampleLoader, - // InputSizeReducerEstimator, getSplits->RandomSampleLoader, - // createRecordReader->RandomSampleLoader, getSplits, createRecordReader) - // numTimesSchemaCalled = 4 (once per registerQuery) if (Util.getLocalTestMode().toString().startsWith("TEZ")) { - _testSkipParseInRegisterForBatch(false, 6, 4); - _testSkipParseInRegisterForBatch(true, 3, 1); + _testSkipParseInRegisterForBatch(false, 8, 4); + _testSkipParseInRegisterForBatch(true, 5, 1); + _testParseBatchWithScripting(5, 1); } else if (Util.getLocalTestMode().toString().startsWith("SPARK")) { // 6 = 4 (Once per registerQuery) + 2 (SortConverter , PigRecordReader) // 4 (Once per registerQuery) @@ -265,13 +264,18 @@ public class TestPigServerLocal { // 1 (registerQuery) _testSkipParseInRegisterForBatch(true, 3, 1); } else { + // numTimesInitiated = 10. 4 (once per registerQuery) + 6 (launchPlan->RandomSampleLoader, + // InputSizeReducerEstimator, getSplits->RandomSampleLoader, + // createRecordReader->RandomSampleLoader, getSplits, createRecordReader) + // numTimesSchemaCalled = 4 (once per registerQuery) _testSkipParseInRegisterForBatch(false, 10, 4); + // numTimesInitiated = 7 (parseAndBuild, launchPlan->RandomSampleLoader, + // InputSizeReducerEstimator, getSplits->RandomSampleLoader, + // createRecordReader->RandomSampleLoader, getSplits, createRecordReader) + // numTimesSchemaCalled = 1 (parseAndBuild) _testSkipParseInRegisterForBatch(true, 7, 1); + _testParseBatchWithScripting(7, 1); } - // numTimesInitiated = 7 (parseAndBuild, launchPlan->RandomSampleLoader, - // InputSizeReducerEstimator, getSplits->RandomSampleLoader, - // createRecordReader->RandomSampleLoader, getSplits, createRecordReader) - // numTimesSchemaCalled = 1 (parseAndBuild) } @Test @@ -330,6 +334,53 @@ public class TestPigServerLocal { } assertEquals(numTimesInitiated, MockTrackingStorage.numTimesInitiated); + assertEquals(numTimesSchemaCalled, MockTrackingStorage.numTimesSchemaCalled); + List<Tuple> out = data.get("bar"); + assertEquals(2, out.size()); + assertEquals(tuple("a", 1, "b"), out.get(0)); + assertEquals(tuple("b", 2, "c"), out.get(1)); + } + + private void _testParseBatchWithScripting(int numTimesInitiated, int numTimesSchemaCalled) throws Throwable { + MockTrackingStorage.numTimesInitiated = 0; + MockTrackingStorage.numTimesSchemaCalled = 0; + + String[] script = { + "#!/usr/bin/python", + "from org.apache.pig.scripting import *", + "P = Pig.compile(\"\"\"" + + "A = load 'foo' USING org.apache.pig.test.TestPigServerLocal\\$MockTrackingStorage();" + + "B = order A by $0,$1,$2;" + + "C = LIMIT B 2;" + + "store C into 'bar' USING mock.Storage();" + + "\"\"\")", + "Q = P.bind()", + "stats = Q.runSingle()", + "if stats.isSuccessful():", + "\tprint 'success!'", + "else:", + "\traise 'failed'" + }; + + Properties properties = new Properties(); + properties.setProperty("io.sort.mb", "2"); + PigContext pigContext = new PigContext(Util.getLocalTestMode(), properties); + PigServer pigServer = new PigServer(pigContext); + Data data = resetData(pigContext); + data.set("foo", tuple("a", 1, "b"), tuple("b", 2, "c"), tuple("c", 3, "d")); + + String scriptFile = tempDir + File.separator + "_testParseBatchWithScripting.py"; + Util.createLocalInputFile(scriptFile , script); + ScriptEngine scriptEngine = ScriptEngine.getInstance("jython"); + Map<String, List<PigStats>> statsMap = scriptEngine.run(pigServer.getPigContext(), scriptFile); + + for (List<PigStats> stats : statsMap.values()) { + for (PigStats s : stats) { + assertTrue(s.isSuccessful()); + } + } + + assertEquals(numTimesInitiated, MockTrackingStorage.numTimesInitiated); assertEquals(numTimesSchemaCalled, MockTrackingStorage.numTimesSchemaCalled); List<Tuple> out = data.get("bar"); assertEquals(2, out.size()); Modified: pig/branches/spark/test/org/apache/pig/test/TestPigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigStats.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPigStats.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPigStats.java Fri Mar 4 18:17:39 2016 @@ -26,9 +26,6 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; -import junit.framework.Assert; - -import org.apache.commons.codec.binary.Base64; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,6 +35,7 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.newplan.logical.relational.LogicalPlan; import org.apache.pig.tools.pigstats.PigStats; import org.junit.Ignore; @@ -46,10 +44,10 @@ import org.junit.Test; @Ignore abstract public class TestPigStats { - private static final Log LOG = LogFactory.getLog(TestPigStats.class); + protected static final Log LOG = LogFactory.getLog(TestPigStats.class); + + abstract public void addSettingsToConf(Configuration conf, String scriptFileName) throws IOException; - abstract public void addSettingsToConf(Configuration conf, String scriptFileName); - @Test public void testPigScriptInConf() throws Exception { PrintWriter w = new PrintWriter(new FileWriter("test.pig")); @@ -58,22 +56,22 @@ abstract public class TestPigStats { w.println("register /mydir/lib/jackson-core-asl-1.4.2.jar"); w.println("register /mydir/lib/jackson-mapper-asl-1.4.2.jar"); w.close(); - + Configuration conf = new Configuration(); addSettingsToConf(conf, "test.pig"); - + String s = conf.get("pig.script"); - String script = new String(Base64.decodeBase64(s.getBytes())); - - String expected = + String script = (String) ObjectSerializer.deserialize(s); + + String expected = "register /mydir/sath.jar\n" + "register /mydir/lib/hadoop-tools-0.20.201.0-SNAPSHOT.jar\n" + "register /mydir/lib/jackson-core-asl-1.4.2.jar\n" + "register /mydir/lib/jackson-mapper-asl-1.4.2.jar\n"; - - Assert.assertEquals(expected, script); + + assertEquals(expected, script); } - + @Test public void testJythonScriptInConf() throws Exception { String[] script = { @@ -90,16 +88,16 @@ abstract public class TestPigStats { "else:", "\traise 'failed'" }; - + Util.createLocalInputFile( "testScript.py", script); - + Configuration conf = new Configuration(); addSettingsToConf(conf, "testScript.py"); - + String s = conf.get("pig.script"); - String actual = new String(Base64.decodeBase64(s.getBytes())); - - String expected = + String actual = (String) ObjectSerializer.deserialize(s); + + String expected = "#!/usr/bin/python\n" + "from org.apache.pig.scripting import *\n" + "Pig.fs(\"rmr simple_out\")\n" + @@ -112,10 +110,10 @@ abstract public class TestPigStats { "\tprint 'success!'\n" + "else:\n" + "\traise 'failed'\n"; - - Assert.assertEquals(expected, actual); + + assertEquals(expected, actual); } - + @Test public void testBytesWritten_JIRA_1027() throws Exception { @@ -157,7 +155,7 @@ abstract public class TestPigStats { pig.registerQuery("D = order C by $1;"); pig.registerQuery("E = limit D 10;"); pig.registerQuery("store E into 'alias_output';"); - + LogicalPlan lp = getLogicalPlan(pig); lp.optimize(pig.getPigContext()); PhysicalPlan pp = ((HExecutionEngine)pig.getPigContext().getExecutionEngine()).compile(lp, @@ -201,7 +199,7 @@ abstract public class TestPigStats { } } } - + private void deleteDirectory(File dir) { try { FileUtils.deleteDirectory(dir); @@ -215,5 +213,5 @@ abstract public class TestPigStats { buildLp.setAccessible(true); return (LogicalPlan ) buildLp.invoke( pig ); } - + } Modified: pig/branches/spark/test/org/apache/pig/test/TestPigStatsMR.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigStatsMR.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPigStatsMR.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPigStatsMR.java Fri Mar 4 18:17:39 2016 @@ -18,24 +18,62 @@ package org.apache.pig.test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import java.io.File; +import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.plan.OperatorKey; -import org.apache.pig.tools.pigstats.ScriptState; +import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.PigStats.JobGraph; +import org.apache.pig.tools.pigstats.ScriptState; import org.apache.pig.tools.pigstats.mapreduce.MRScriptState; +import org.junit.Test; public class TestPigStatsMR extends TestPigStats { + + @Override + @Test + public void testBytesWritten_JIRA_1027() throws Exception { + + FileLocalizer.setInitialized(false); + // This test cannot be run in MR local mode due to lack of counters + MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); + try { + String filePath = "/tmp/" + this.getClass().getName() + "_" + + "testBytesWritten_JIRA_1027"; + + PigServer pig = new PigServer(cluster.getExecType(), cluster.getProperties()); + String inputFile = "test/org/apache/pig/test/data/passwd"; + Util.copyFromLocalToCluster(cluster, inputFile, inputFile); + pig.registerQuery("A = load '" + inputFile + "';"); + ExecJob job = pig.store("A", filePath); + PigStats stats = job.getStatistics(); + Path dataFile = Util.getFirstPartFile(new Path(filePath)); + FileStatus fs = cluster.getFileSystem().getFileStatus(dataFile); + assertEquals(fs.getLen(), stats.getBytesWritten()); + } catch (IOException e) { + LOG.error("Error while generating file", e); + fail("Encountered IOException"); + } finally { + FileLocalizer.setInitialized(false); + cluster.shutDown(); + } + } + @Override - public void addSettingsToConf(Configuration conf, String scriptFileName) { + public void addSettingsToConf(Configuration conf, String scriptFileName) throws IOException { MRScriptState ss = MRScriptState.get(); ss.setScript(new File(scriptFileName)); MapReduceOper mro = new MapReduceOper(new OperatorKey()); @@ -71,7 +109,7 @@ public class TestPigStatsMR extends Test compile.setAccessible(true); return (MROperPlan) compile.invoke(launcher, new Object[] { pp, ctx }); } - + private static String getAlias(MapReduceOper mro) throws Exception { ScriptState ss = ScriptState.get(); java.lang.reflect.Method getAlias = ss.getClass() Modified: pig/branches/spark/test/org/apache/pig/test/TestRank1.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestRank1.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestRank1.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestRank1.java Fri Mar 4 18:17:39 2016 @@ -19,6 +19,7 @@ package org.apache.pig.test; import static org.apache.pig.builtin.mock.Storage.resetData; import static org.apache.pig.builtin.mock.Storage.tuple; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.List; @@ -30,6 +31,12 @@ import org.apache.pig.data.TupleFactory; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMultiset; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.TreeMultiset; +import com.google.common.collect.Multiset; + public class TestRank1 { private static TupleFactory tf = TupleFactory.getInstance(); private static PigServer pigServer; @@ -70,20 +77,21 @@ public class TestRank1 { + "store C into 'result' using mock.Storage();"; Util.registerMultiLineQuery(pigServer, query); - List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{ - "(1L,'A',1,'N')", - "(2L,'B',2,'N')", - "(3L,'C',3,'M')", - "(4L,'D',4,'P')", - "(5L,'E',4,'Q')", - "(6L,'E',4,'Q')", - "(7L,'F',8,'Q')", - "(8L,'F',7,'Q')", - "(9L,'F',8,'T')", - "(10L,'F',8,'Q')", - "(11L,'G',10,'V')" - }); - Util.checkQueryOutputsAfterSort(data.get("result"), expected); + + Multiset<Tuple> expected = ImmutableMultiset.of( + tf.newTuple(ImmutableList.of((long) 1, "A", 1, "N")), + tf.newTuple(ImmutableList.of((long) 2, "B", 2, "N")), + tf.newTuple(ImmutableList.of((long) 3, "C", 3, "M")), + tf.newTuple(ImmutableList.of((long) 4, "D", 4, "P")), + tf.newTuple(ImmutableList.of((long) 5, "E", 4, "Q")), + tf.newTuple(ImmutableList.of((long) 6, "E", 4, "Q")), + tf.newTuple(ImmutableList.of((long) 7, "F", 8, "Q")), + tf.newTuple(ImmutableList.of((long) 8, "F", 7, "Q")), + tf.newTuple(ImmutableList.of((long) 9, "F", 8, "T")), + tf.newTuple(ImmutableList.of((long) 10, "F", 8, "Q")), + tf.newTuple(ImmutableList.of((long) 11, "G", 10, "V"))); + + verifyExpected(data.get("result"), expected); } @Test @@ -93,22 +101,23 @@ public class TestRank1 { + "store B into 'result' using mock.Storage();"; Util.registerMultiLineQuery(pigServer, query); - List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{ - "(1L,'Michael', 'Blythe', 1,1, 1, 1, 4557045.046, 98027)", - "(2L,'Linda','Mitchell', 2, 1, 1, 1, 5200475.231, 98027)", - "(3L,'Jillian', 'Carson', 3,1, 1, 1, 3857163.633, 98027)", - "(4L,'Garrett','Vargas', 4, 1, 1, 1, 1764938.986, 98027)", - "(5L,'Tsvi', 'Reiter',5, 1, 1, 2, 2811012.715, 98027)", - "(6L,'Shu', 'Ito', 6,6, 2, 2, 3018725.486, 98055)", - "(7L,'Jose', 'Saraiva',7, 6, 2, 2, 3189356.247, 98055)", - "(8L,'David','Campbell', 8, 6, 2, 3, 3587378.426, 98055)", - "(9L,'Tete', 'Mensa-Annan',9, 6, 2, 3, 1931620.184, 98055)", - "(10L, 'Lynn','Tsoflias', 10, 6, 2, 3, 1758385.926, 98055)", - "(11L, 'Rachel', 'Valdez', 11,6, 2, 4, 2241204.042, 98055)", - "(12L, 'Jae', 'Pak', 12,6, 2, 4, 5015682.375, 98055)", - "(13L, 'Ranjit','Varkey Chudukatil', 13, 6, 2, 4, 3827950.238,98055)" - }); - Util.checkQueryOutputsAfterSort(data.get("result"), expected); + + Multiset<Tuple> expected = ImmutableMultiset.of( + tf.newTuple(ImmutableList.of((long) 1, "Michael", "Blythe", 1,1, 1, 1, 4557045.046, 98027)), + tf.newTuple(ImmutableList.of((long) 2, "Linda","Mitchell", 2, 1, 1, 1, 5200475.231, 98027)), + tf.newTuple(ImmutableList.of((long) 3, "Jillian", "Carson", 3,1, 1, 1, 3857163.633, 98027)), + tf.newTuple(ImmutableList.of((long) 4, "Garrett","Vargas", 4, 1, 1, 1, 1764938.986, 98027)), + tf.newTuple(ImmutableList.of((long) 5, "Tsvi", "Reiter",5, 1, 1, 2, 2811012.715, 98027)), + tf.newTuple(ImmutableList.of((long) 6, "Shu", "Ito", 6,6, 2, 2, 3018725.486, 98055)), + tf.newTuple(ImmutableList.of((long) 7, "Jose", "Saraiva",7, 6, 2, 2, 3189356.247, 98055)), + tf.newTuple(ImmutableList.of((long) 8, "David","Campbell", 8, 6, 2, 3, 3587378.426, 98055)), + tf.newTuple(ImmutableList.of((long) 9, "Tete", "Mensa-Annan",9, 6, 2, 3, 1931620.184, 98055)), + tf.newTuple(ImmutableList.of((long) 10, "Lynn","Tsoflias", 10, 6, 2, 3, 1758385.926, 98055)), + tf.newTuple(ImmutableList.of((long) 11, "Rachel", "Valdez", 11,6, 2, 4, 2241204.042, 98055)), + tf.newTuple(ImmutableList.of((long) 12, "Jae", "Pak", 12,6, 2, 4, 5015682.375, 98055)), + tf.newTuple(ImmutableList.of((long) 13, "Ranjit","Varkey Chudukatil", 13, 6, 2, 4, 3827950.238,98055))); + + verifyExpected(data.get("result"), expected); } @Test @@ -118,20 +127,21 @@ public class TestRank1 { + "store C into 'result' using mock.Storage();"; Util.registerMultiLineQuery(pigServer, query); - List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{ - "(1L,'C',3,'M')", - "(2L,'A',1,'N')", - "(2L,'B',2,'N')", - "(4L,'D',4,'P')", - "(5L,'E',4,'Q')", - "(5L,'E',4,'Q')", - "(5L,'F',8,'Q')", - "(5L,'F',7,'Q')", - "(5L,'F',8,'Q')", - "(10L,'F',8,'T')", - "(11L,'G',10,'V')" - }); - Util.checkQueryOutputsAfterSort(data.get("result"), expected); + + Multiset<Tuple> expected = ImmutableMultiset.of( + tf.newTuple(ImmutableList.of((long) 1, "C", 3, "M")), + tf.newTuple(ImmutableList.of((long) 2, "A", 1, "N")), + tf.newTuple(ImmutableList.of((long) 2, "B", 2, "N")), + tf.newTuple(ImmutableList.of((long) 4, "D", 4, "P")), + tf.newTuple(ImmutableList.of((long) 5, "E", 4, "Q")), + tf.newTuple(ImmutableList.of((long) 5, "E", 4, "Q")), + tf.newTuple(ImmutableList.of((long) 5, "F", 8, "Q")), + tf.newTuple(ImmutableList.of((long) 5, "F", 7, "Q")), + tf.newTuple(ImmutableList.of((long) 5, "F", 8, "Q")), + tf.newTuple(ImmutableList.of((long) 10, "F", 8, "T")), + tf.newTuple(ImmutableList.of((long) 11, "G", 10, "V"))); + + verifyExpected(data.get("result"), expected); } @Test @@ -141,20 +151,21 @@ public class TestRank1 { + "store C into 'result' using mock.Storage();"; Util.registerMultiLineQuery(pigServer, query); - List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{ - "(1L,'A',1,'N')", - "(2L,'B',2,'N')", - "(3L,'C',3,'M')", - "(4L,'D',4,'P')", - "(4L,'E',4,'Q')", - "(4L,'E',4,'Q')", - "(7L,'F',7,'Q')", - "(8L,'F',8,'Q')", - "(8L,'F',8,'Q')", - "(8L,'F',8,'T')", - "(11L,'G',10,'V')" - }); - Util.checkQueryOutputsAfterSort(data.get("result"), expected); + + Multiset<Tuple> expected = ImmutableMultiset.of( + tf.newTuple(ImmutableList.of((long) 1, "A", 1, "N")), + tf.newTuple(ImmutableList.of((long) 2, "B", 2, "N")), + tf.newTuple(ImmutableList.of((long) 3, "C", 3, "M")), + tf.newTuple(ImmutableList.of((long) 4, "D", 4, "P")), + tf.newTuple(ImmutableList.of((long) 4, "E", 4, "Q")), + tf.newTuple(ImmutableList.of((long) 4, "E", 4, "Q")), + tf.newTuple(ImmutableList.of((long) 7, "F", 7, "Q")), + tf.newTuple(ImmutableList.of((long) 8, "F", 8, "Q")), + tf.newTuple(ImmutableList.of((long) 8, "F", 8, "Q")), + tf.newTuple(ImmutableList.of((long) 8, "F", 8, "T")), + tf.newTuple(ImmutableList.of((long) 11, "G", 10, "V"))); + + verifyExpected(data.get("result"), expected); } @Test @@ -164,20 +175,21 @@ public class TestRank1 { + "store C into 'result' using mock.Storage();"; Util.registerMultiLineQuery(pigServer, query); - List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{ - "(1L,'G',10,'V')", - "(2L,'F',8,'T')", - "(2L,'F',8,'Q')", - "(2L,'F',8,'Q')", - "(2L,'F',7,'Q')", - "(6L,'E',4,'Q')", - "(6L,'E',4,'Q')", - "(8L,'D',4,'P')", - "(9L,'C',3,'M')", - "(10L,'B',2,'N')", - "(11L,'A',1,'N')" - }); - Util.checkQueryOutputsAfterSort(data.get("result"), expected); + + Multiset<Tuple> expected = ImmutableMultiset.of( + tf.newTuple(ImmutableList.of((long) 1, "G", 10, "V")), + tf.newTuple(ImmutableList.of((long) 2, "F", 8, "T")), + tf.newTuple(ImmutableList.of((long) 2, "F", 8, "Q")), + tf.newTuple(ImmutableList.of((long) 2, "F", 8, "Q")), + tf.newTuple(ImmutableList.of((long) 2, "F", 7, "Q")), + tf.newTuple(ImmutableList.of((long) 6, "E", 4, "Q")), + tf.newTuple(ImmutableList.of((long) 6, "E", 4, "Q")), + tf.newTuple(ImmutableList.of((long) 8, "D", 4, "P")), + tf.newTuple(ImmutableList.of((long) 9, "C", 3, "M")), + tf.newTuple(ImmutableList.of((long) 10, "B", 2, "N")), + tf.newTuple(ImmutableList.of((long) 11, "A", 1, "N"))); + + verifyExpected(data.get("result"), expected); } @Test @@ -187,22 +199,23 @@ public class TestRank1 { + "store C into 'result' using mock.Storage();"; Util.registerMultiLineQuery(pigServer, query); - List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{ - "(1L,'Michael','Blythe',1,1,1,1,4557045.046,98027)", - "(1L,'Linda','Mitchell',2,1,1,1,5200475.231,98027)", - "(1L,'Jillian','Carson',3,1,1,1,3857163.633,98027)", - "(1L,'Garrett','Vargas',4,1,1,1,1764938.986,98027)", - "(1L,'Tsvi','Reiter',5,1,1,2,2811012.715,98027)", - "(6L,'Shu','Ito',6,6,2,2,3018725.486,98055)", - "(6L,'Jose','Saraiva',7,6,2,2,3189356.247,98055)", - "(6L,'David','Campbell',8,6,2,3,3587378.426,98055)", - "(6L,'Tete','Mensa-Annan',9,6,2,3,1931620.184,98055)", - "(6L,'Lynn','Tsoflias',10,6,2,3,1758385.926,98055)", - "(6L,'Rachel','Valdez',11,6,2,4,2241204.042,98055)", - "(6L,'Jae','Pak',12,6,2,4,5015682.375,98055)", - "(6L,'Ranjit','Varkey Chudukatil',13,6,2,4,3827950.238,98055)", - }); - Util.checkQueryOutputsAfterSort(data.get("result"), expected); + + Multiset<Tuple> expected = ImmutableMultiset.of( + tf.newTuple(ImmutableList.of((long) 1, "Michael", "Blythe", 1,1, 1, 1, 4557045.046, 98027)), + tf.newTuple(ImmutableList.of((long) 1, "Linda","Mitchell", 2, 1, 1, 1, 5200475.231, 98027)), + tf.newTuple(ImmutableList.of((long) 1, "Jillian", "Carson", 3,1, 1, 1, 3857163.633, 98027)), + tf.newTuple(ImmutableList.of((long) 1, "Garrett","Vargas", 4, 1, 1, 1, 1764938.986, 98027)), + tf.newTuple(ImmutableList.of((long) 1, "Tsvi", "Reiter",5, 1, 1, 2, 2811012.715, 98027)), + tf.newTuple(ImmutableList.of((long) 6, "Shu", "Ito", 6,6, 2, 2, 3018725.486, 98055)), + tf.newTuple(ImmutableList.of((long) 6, "Jose", "Saraiva",7, 6, 2, 2, 3189356.247, 98055)), + tf.newTuple(ImmutableList.of((long) 6, "David","Campbell", 8, 6, 2, 3, 3587378.426, 98055)), + tf.newTuple(ImmutableList.of((long) 6, "Tete", "Mensa-Annan",9, 6, 2, 3, 1931620.184, 98055)), + tf.newTuple(ImmutableList.of((long) 6, "Lynn","Tsoflias", 10, 6, 2, 3, 1758385.926, 98055)), + tf.newTuple(ImmutableList.of((long) 6, "Rachel", "Valdez", 11,6, 2, 4, 2241204.042, 98055)), + tf.newTuple(ImmutableList.of((long) 6, "Jae", "Pak", 12,6, 2, 4, 5015682.375, 98055)), + tf.newTuple(ImmutableList.of((long) 6, "Ranjit","Varkey Chudukatil", 13, 6, 2, 4, 3827950.238,98055))); + + verifyExpected(data.get("result"), expected); } @Test @@ -212,22 +225,23 @@ public class TestRank1 { + "store C into 'result' using mock.Storage();"; Util.registerMultiLineQuery(pigServer, query); - List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{ - "(1L,'David','Campbell',8,6,2,3,3587378.426,98055)", - "(2L,'Garrett','Vargas',4,1,1,1,1764938.986,98027)", - "(3L,'Jae','Pak',12,6,2,4,5015682.375,98055)", - "(4L,'Jillian','Carson',3,1,1,1,3857163.633,98027)", - "(5L,'Jose','Saraiva',7,6,2,2,3189356.247,98055)", - "(6L,'Linda','Mitchell',2,1,1,1,5200475.231,98027)", - "(7L,'Lynn','Tsoflias',10,6,2,3,1758385.926,98055)", - "(8L,'Michael','Blythe',1,1,1,1,4557045.046,98027)", - "(9L,'Rachel','Valdez',11,6,2,4,2241204.042,98055)", - "(10L,'Ranjit','Varkey Chudukatil',13,6,2,4,3827950.238,98055)", - "(11L,'Shu','Ito',6,6,2,2,3018725.486,98055)", - "(12L,'Tete','Mensa-Annan',9,6,2,3,1931620.184,98055)", - "(13L,'Tsvi','Reiter',5,1,1,2,2811012.715,98027)" - }); - Util.checkQueryOutputsAfterSort(data.get("result"), expected); + + Multiset<Tuple> expected = ImmutableMultiset.of( + tf.newTuple(ImmutableList.of((long) 1, "David", "Campbell", 8,6, 2, 3, 3587378.426, 98055)), + tf.newTuple(ImmutableList.of((long) 2, "Garrett","Vargas", 4, 1, 1, 1, 1764938.986, 98027)), + tf.newTuple(ImmutableList.of((long) 3, "Jae", "Pak", 12,6, 2, 4, 5015682.375, 98055)), + tf.newTuple(ImmutableList.of((long) 4, "Jillian","Carson", 3, 1, 1, 1, 3857163.633, 98027)), + tf.newTuple(ImmutableList.of((long) 5, "Jose", "Saraiva",7, 6, 2, 2, 3189356.247, 98055)), + tf.newTuple(ImmutableList.of((long) 6, "Linda","Mitchell", 2, 1, 1, 1, 5200475.231, 98027)), + tf.newTuple(ImmutableList.of((long) 7, "Lynn", "Tsoflias", 10,6, 2, 3, 1758385.926, 98055)), + tf.newTuple(ImmutableList.of((long) 8, "Michael","Blythe", 1, 1, 1, 1, 4557045.046, 98027)), + tf.newTuple(ImmutableList.of((long) 9, "Rachel","Valdez", 11, 6, 2, 4, 2241204.042, 98055)), + tf.newTuple(ImmutableList.of((long) 10, "Ranjit","Varkey Chudukatil", 13, 6, 2, 4, 3827950.238, 98055)), + tf.newTuple(ImmutableList.of((long) 11, "Shu", "Ito", 6, 6, 2, 2, 3018725.486,98055)), + tf.newTuple(ImmutableList.of((long) 12, "Tete", "Mensa-Annan", 9, 6, 2, 3,1931620.184, 98055)), + tf.newTuple(ImmutableList.of((long) 13, "Tsvi", "Reiter", 5, 1, 1, 2, 2811012.715,98027))); + + verifyExpected(data.get("result"), expected); } @Test @@ -237,22 +251,23 @@ public class TestRank1 { + "store C into 'result' using mock.Storage();"; Util.registerMultiLineQuery(pigServer, query); - List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{ - "(1L,'David','Campbell',8,6,2,3,3587378.426,98055)", - "(2L,'Garrett','Vargas',4,1,1,1,1764938.986,98027)", - "(3L,'Jae','Pak',12,6,2,4,5015682.375,98055)", - "(4L,'Jillian','Carson',3,1,1,1,3857163.633,98027)", - "(5L,'Jose','Saraiva',7,6,2,2,3189356.247,98055)", - "(6L,'Linda','Mitchell',2,1,1,1,5200475.231,98027)", - "(7L,'Lynn','Tsoflias',10,6,2,3,1758385.926,98055)", - "(8L,'Michael','Blythe',1,1,1,1,4557045.046,98027)", - "(9L,'Rachel','Valdez',11,6,2,4,2241204.042,98055)", - "(10L,'Ranjit','Varkey Chudukatil',13,6,2,4,3827950.238,98055)", - "(11L,'Shu','Ito',6,6,2,2,3018725.486,98055)", - "(12L,'Tete','Mensa-Annan',9,6,2,3,1931620.184,98055)", - "(13L,'Tsvi','Reiter',5,1,1,2,2811012.715,98027)" - }); - Util.checkQueryOutputsAfterSort(data.get("result"), expected); + + Multiset<Tuple> expected = ImmutableMultiset.of( + tf.newTuple(ImmutableList.of((long) 1, "David", "Campbell", 8, 6, 2, 3, 3587378.426, 98055)), + tf.newTuple(ImmutableList.of((long) 2, "Garrett","Vargas", 4, 1, 1, 1, 1764938.986, 98027)), + tf.newTuple(ImmutableList.of((long) 3, "Jae", "Pak", 12,6, 2, 4, 5015682.375, 98055)), + tf.newTuple(ImmutableList.of((long) 4, "Jillian","Carson", 3, 1, 1, 1, 3857163.633, 98027)), + tf.newTuple(ImmutableList.of((long) 5, "Jose", "Saraiva",7, 6, 2, 2, 3189356.247, 98055)), + tf.newTuple(ImmutableList.of((long) 6, "Linda","Mitchell", 2, 1, 1, 1, 5200475.231, 98027)), + tf.newTuple(ImmutableList.of((long) 7, "Lynn", "Tsoflias", 10,6, 2, 3, 1758385.926, 98055)), + tf.newTuple(ImmutableList.of((long) 8, "Michael","Blythe", 1, 1, 1, 1, 4557045.046, 98027)), + tf.newTuple(ImmutableList.of((long) 9, "Rachel","Valdez", 11, 6, 2, 4, 2241204.042, 98055)), + tf.newTuple(ImmutableList.of((long) 10, "Ranjit","Varkey Chudukatil", 13, 6, 2, 4, 3827950.238, 98055)), + tf.newTuple(ImmutableList.of((long) 11, "Shu", "Ito", 6, 6, 2, 2, 3018725.486,98055)), + tf.newTuple(ImmutableList.of((long) 12, "Tete", "Mensa-Annan", 9, 6, 2, 3,1931620.184, 98055)), + tf.newTuple(ImmutableList.of((long) 13, "Tsvi", "Reiter", 5, 1, 1, 2, 2811012.715,98027))); + + verifyExpected(data.get("result"), expected); } @Test @@ -262,20 +277,38 @@ public class TestRank1 { + "store C into 'result' using mock.Storage();"; Util.registerMultiLineQuery(pigServer, query); - List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{ - "(1L,'A',1,'N')", - "(2L,'B',2,'N')", - "(3L,'C',3,'M')", - "(4L,'D',4,'P')", - "(5L,'E',4,'Q')", - "(5L,'E',4,'Q')", - "(7L,'F',7,'Q')", - "(8L,'F',8,'Q')", - "(8L,'F',8,'Q')", - "(10L,'F',8,'T')", - "(11L,'G',10,'V')" - }); - Util.checkQueryOutputsAfterSort(data.get("result"), expected); + + Multiset<Tuple> expected = ImmutableMultiset.of( + tf.newTuple(ImmutableList.of((long) 1, "A", 1, "N")), + tf.newTuple(ImmutableList.of((long) 2, "B", 2, "N")), + tf.newTuple(ImmutableList.of((long) 3, "C", 3, "M")), + tf.newTuple(ImmutableList.of((long) 4, "D", 4, "P")), + tf.newTuple(ImmutableList.of((long) 5, "E", 4, "Q")), + tf.newTuple(ImmutableList.of((long) 5, "E", 4, "Q")), + tf.newTuple(ImmutableList.of((long) 7, "F", 7, "Q")), + tf.newTuple(ImmutableList.of((long) 8, "F", 8, "Q")), + tf.newTuple(ImmutableList.of((long) 8, "F", 8, "Q")), + tf.newTuple(ImmutableList.of((long) 10, "F", 8, "T")), + tf.newTuple(ImmutableList.of((long) 11, "G", 10, "V"))); + + verifyExpected(data.get("result"), expected); } + public void verifyExpected(List<Tuple> out, Multiset<Tuple> expected) { + Multiset<Tuple> resultMultiset = TreeMultiset.create(); + for (Tuple tup : out) { + resultMultiset.add(tup); + } + + StringBuilder error = new StringBuilder("Result does not match.\nActual result:\n"); + for (Tuple tup : resultMultiset.elementSet() ) { + error.append(tup).append(" x ").append(resultMultiset.count(tup)).append("\n"); + } + error.append("Expceted result:\n"); + for (Tuple tup : ImmutableSortedSet.copyOf(expected) ) { + error.append(tup).append(" x ").append(expected.count(tup)).append("\n"); + } + + assertTrue(error.toString(), resultMultiset.equals(expected)); + } } Modified: pig/branches/spark/test/org/apache/pig/test/TestRank2.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestRank2.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestRank2.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestRank2.java Fri Mar 4 18:17:39 2016 @@ -19,6 +19,7 @@ package org.apache.pig.test; import static org.apache.pig.builtin.mock.Storage.resetData; import static org.apache.pig.builtin.mock.Storage.tuple; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.List; @@ -30,6 +31,11 @@ import org.apache.pig.data.TupleFactory; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMultiset; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.TreeMultiset; +import com.google.common.collect.Multiset; public class TestRank2 { private static PigServer pigServer; @@ -71,20 +77,21 @@ public class TestRank2 { + "store C into 'result' using mock.Storage();"; Util.registerMultiLineQuery(pigServer, query); - List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{ - "(1L,'C',3,'M')", - "(2L,'A',1,'N')", - "(2L,'B',2,'N')", - "(3L,'D',4,'P')", - "(4L,'E',4,'Q')", - "(4L,'E',4,'Q')", - "(4L,'F',8,'Q')", - "(4L,'F',7,'Q')", - "(4L,'F',8,'Q')", - "(5L,'F',8,'T')", - "(6L,'G',10,'V')" - }); - Util.checkQueryOutputsAfterSort(data.get("result"), expected); + + Multiset<Tuple> expected = ImmutableMultiset.of( + tf.newTuple(ImmutableList.of((long) 1, "C", 3, "M")), + tf.newTuple(ImmutableList.of((long) 2, "A", 1, "N")), + tf.newTuple(ImmutableList.of((long) 2, "B", 2, "N")), + tf.newTuple(ImmutableList.of((long) 3, "D", 4, "P")), + tf.newTuple(ImmutableList.of((long) 4, "E", 4, "Q")), + tf.newTuple(ImmutableList.of((long) 4, "E", 4, "Q")), + tf.newTuple(ImmutableList.of((long) 4, "F", 8, "Q")), + tf.newTuple(ImmutableList.of((long) 4, "F", 7, "Q")), + tf.newTuple(ImmutableList.of((long) 4, "F", 8, "Q")), + tf.newTuple(ImmutableList.of((long) 5, "F", 8, "T")), + tf.newTuple(ImmutableList.of((long) 6, "G", 10, "V"))); + + verifyExpected(data.get("result"), expected); } @Test @@ -94,20 +101,21 @@ public class TestRank2 { + "store C into 'result' using mock.Storage();"; Util.registerMultiLineQuery(pigServer, query); - List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{ - "(1L,'A',1,'N')", - "(2L,'B',2,'N')", - "(3L,'C',3,'M')", - "(4L,'D',4,'P')", - "(4L,'E',4,'Q')", - "(4L,'E',4,'Q')", - "(5L,'F',7,'Q')", - "(6L,'F',8,'Q')", - "(6L,'F',8,'Q')", - "(6L,'F',8,'T')", - "(7L,'G',10,'V')" - }); - Util.checkQueryOutputsAfterSort(data.get("result"), expected); + + Multiset<Tuple> expected = ImmutableMultiset.of( + tf.newTuple(ImmutableList.of((long) 1, "A", 1, "N")), + tf.newTuple(ImmutableList.of((long) 2, "B", 2, "N")), + tf.newTuple(ImmutableList.of((long) 3, "C", 3, "M")), + tf.newTuple(ImmutableList.of((long) 4, "D", 4, "P")), + tf.newTuple(ImmutableList.of((long) 4, "E", 4, "Q")), + tf.newTuple(ImmutableList.of((long) 4, "E", 4, "Q")), + tf.newTuple(ImmutableList.of((long) 5, "F", 7, "Q")), + tf.newTuple(ImmutableList.of((long) 6, "F", 8, "Q")), + tf.newTuple(ImmutableList.of((long) 6, "F", 8, "Q")), + tf.newTuple(ImmutableList.of((long) 6, "F", 8, "T")), + tf.newTuple(ImmutableList.of((long) 7, "G", 10, "V"))); + + verifyExpected(data.get("result"), expected); } @Test @@ -117,20 +125,21 @@ public class TestRank2 { + "store C into 'result' using mock.Storage();"; Util.registerMultiLineQuery(pigServer, query); - List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{ - "(1L,'G',10,'V')", - "(2L,'F',8,'T')", - "(2L,'F',8,'Q')", - "(2L,'F',8,'Q')", - "(2L,'F',7,'Q')", - "(3L,'E',4,'Q')", - "(3L,'E',4,'Q')", - "(4L,'D',4,'P')", - "(5L,'C',3,'M')", - "(6L,'B',2,'N')", - "(7L,'A',1,'N')" - }); - Util.checkQueryOutputsAfterSort(data.get("result"), expected); + + Multiset<Tuple> expected = ImmutableMultiset.of( + tf.newTuple(ImmutableList.of((long) 1, "G", 10, "V")), + tf.newTuple(ImmutableList.of((long) 2, "F", 8, "T")), + tf.newTuple(ImmutableList.of((long) 2, "F", 8, "Q")), + tf.newTuple(ImmutableList.of((long) 2, "F", 8, "Q")), + tf.newTuple(ImmutableList.of((long) 2, "F", 7, "Q")), + tf.newTuple(ImmutableList.of((long) 3, "E", 4, "Q")), + tf.newTuple(ImmutableList.of((long) 3, "E", 4, "Q")), + tf.newTuple(ImmutableList.of((long) 4, "D", 4, "P")), + tf.newTuple(ImmutableList.of((long) 5, "C", 3, "M")), + tf.newTuple(ImmutableList.of((long) 6, "B", 2, "N")), + tf.newTuple(ImmutableList.of((long) 7, "A", 1, "N"))); + + verifyExpected(data.get("result"), expected); } @Test @@ -140,20 +149,40 @@ public class TestRank2 { + "store C into 'result' using mock.Storage();"; Util.registerMultiLineQuery(pigServer, query); - List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{ - "(1L,'A',1,'N')", - "(2L,'B',2,'N')", - "(3L,'C',3,'M')", - "(4L,'D',4,'P')", - "(5L,'E',4,'Q')", - "(5L,'E',4,'Q')", - "(6L,'F',8,'Q')", - "(6L,'F',8,'Q')", - "(6L,'F',8,'T')", - "(7L,'F',7,'Q')", - "(8L,'G',10,'V')" - }); - Util.checkQueryOutputsAfterSort(data.get("result"), expected); + + Multiset<Tuple> expected = ImmutableMultiset.of( + tf.newTuple(ImmutableList.of((long) 1, "A", 1, "N")), + tf.newTuple(ImmutableList.of((long) 2, "B", 2, "N")), + tf.newTuple(ImmutableList.of((long) 3, "C", 3, "M")), + tf.newTuple(ImmutableList.of((long) 4, "D", 4, "P")), + tf.newTuple(ImmutableList.of((long) 5, "E", 4, "Q")), + tf.newTuple(ImmutableList.of((long) 5, "E", 4, "Q")), + tf.newTuple(ImmutableList.of((long) 6, "F", 8, "Q")), + tf.newTuple(ImmutableList.of((long) 6, "F", 8, "Q")), + tf.newTuple(ImmutableList.of((long) 6, "F", 8, "T")), + tf.newTuple(ImmutableList.of((long) 7, "F", 7, "Q")), + tf.newTuple(ImmutableList.of((long) 8, "G", 10, "V"))); + + verifyExpected(data.get("result"), expected); + } + + public void verifyExpected(List<Tuple> out, Multiset<Tuple> expected) { + Multiset<Tuple> resultMultiset = TreeMultiset.create(); + for (Tuple tup : out) { + resultMultiset.add(tup); + } + + StringBuilder error = new StringBuilder("Result does not match.\nActual result:\n"); + for (Tuple tup : resultMultiset.elementSet() ) { + error.append(tup).append(" x ").append(resultMultiset.count(tup)).append("\n"); + } + error.append("Expceted result:\n"); + for (Tuple tup : ImmutableSortedSet.copyOf(expected) ) { + error.append(tup).append(" x ").append(expected.count(tup)).append("\n"); + } + + //This one line test should be sufficient but adding the above + //for-loop for better error messages + assertTrue(error.toString(), resultMultiset.equals(expected)); } - -} \ No newline at end of file +} Modified: pig/branches/spark/test/org/apache/pig/test/TestRank3.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestRank3.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestRank3.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestRank3.java Fri Mar 4 18:17:39 2016 @@ -19,6 +19,7 @@ package org.apache.pig.test; import static org.apache.pig.builtin.mock.Storage.resetData; import static org.apache.pig.builtin.mock.Storage.tuple; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.List; @@ -32,6 +33,11 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMultiset; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.TreeMultiset; +import com.google.common.collect.Multiset; public class TestRank3 { private static PigServer pigServer; @@ -108,39 +114,41 @@ public class TestRank3 { + "store R8 into 'result' using mock.Storage();"; Util.registerMultiLineQuery(pigServer, query); - List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{ - "(1L,21L,5L,7L,1L,1L,0L,8L,8L)", - "(2L,26L,2L,3L,2L,5L,1L,9L,10L)", - "(3L,30L,24L,21L,2L,3L,1L,3L,10L)", - "(4L,6L,10L,8L,3L,4L,1L,7L,2L)", - "(5L,8L,28L,25L,3L,2L,1L,0L,2L)", - "(6L,28L,11L,12L,4L,6L,2L,7L,10L)", - "(7L,9L,26L,22L,5L,7L,3L,2L,3L)", - "(8L,5L,6L,5L,6L,8L,3L,8L,1L)", - "(9L,29L,16L,15L,7L,9L,4L,6L,10L)", - "(10L,18L,12L,10L,8L,11L,5L,7L,6L)", - "(11L,14L,17L,14L,9L,10L,5L,6L,5L)", - "(12L,6L,12L,8L,10L,11L,5L,7L,2L)", - "(13L,2L,17L,13L,11L,10L,5L,6L,0L)", - "(14L,26L,3L,3L,12L,14L,6L,9L,10L)", - "(15L,15L,20L,18L,13L,13L,6L,4L,5L)", - "(16L,3L,29L,24L,14L,12L,6L,0L,0L)", - "(17L,23L,21L,19L,15L,16L,7L,4L,8L)", - "(18L,19L,19L,16L,16L,17L,7L,5L,6L)", - "(19L,20L,30L,26L,16L,15L,7L,0L,6L)", - "(20L,12L,21L,17L,17L,16L,7L,4L,4L)", - "(21L,4L,1L,1L,18L,19L,7L,10L,1L)", - "(22L,1L,7L,4L,19L,18L,7L,8L,0L)", - "(23L,24L,14L,11L,20L,21L,8L,7L,9L)", - "(24L,16L,25L,20L,21L,20L,8L,3L,5L)", - "(25L,25L,27L,23L,22L,22L,9L,1L,9L)", - "(26L,21L,8L,7L,23L,25L,9L,8L,8L)", - "(27L,17L,4L,2L,24L,26L,9L,9L,6L)", - "(28L,10L,8L,6L,25L,25L,9L,8L,4L)", - "(29L,11L,15L,9L,25L,24L,9L,7L,4L)", - "(30L,12L,23L,17L,25L,23L,9L,4L,4L)" - }); - Util.checkQueryOutputsAfterSort(data.get("result"), expected); + + Multiset<Tuple> expected = ImmutableMultiset.of( + tf.newTuple(ImmutableList.of(1L,21L,5L,7L,1L,1L,0L,8L,8L)), + tf.newTuple(ImmutableList.of(2L,26L,2L,3L,2L,5L,1L,9L,10L)), + tf.newTuple(ImmutableList.of(3L,30L,24L,21L,2L,3L,1L,3L,10L)), + tf.newTuple(ImmutableList.of(4L,6L,10L,8L,3L,4L,1L,7L,2L)), + tf.newTuple(ImmutableList.of(5L,8L,28L,25L,3L,2L,1L,0L,2L)), + tf.newTuple(ImmutableList.of(6L,28L,11L,12L,4L,6L,2L,7L,10L)), + tf.newTuple(ImmutableList.of(7L,9L,26L,22L,5L,7L,3L,2L,3L)), + tf.newTuple(ImmutableList.of(8L,5L,6L,5L,6L,8L,3L,8L,1L)), + tf.newTuple(ImmutableList.of(9L,29L,16L,15L,7L,9L,4L,6L,10L)), + tf.newTuple(ImmutableList.of(10L,18L,12L,10L,8L,11L,5L,7L,6L)), + tf.newTuple(ImmutableList.of(11L,14L,17L,14L,9L,10L,5L,6L,5L)), + tf.newTuple(ImmutableList.of(12L,6L,12L,8L,10L,11L,5L,7L,2L)), + tf.newTuple(ImmutableList.of(13L,2L,17L,13L,11L,10L,5L,6L,0L)), + tf.newTuple(ImmutableList.of(14L,26L,3L,3L,12L,14L,6L,9L,10L)), + tf.newTuple(ImmutableList.of(15L,15L,20L,18L,13L,13L,6L,4L,5L)), + tf.newTuple(ImmutableList.of(16L,3L,29L,24L,14L,12L,6L,0L,0L)), + tf.newTuple(ImmutableList.of(17L,23L,21L,19L,15L,16L,7L,4L,8L)), + tf.newTuple(ImmutableList.of(18L,19L,19L,16L,16L,17L,7L,5L,6L)), + tf.newTuple(ImmutableList.of(19L,20L,30L,26L,16L,15L,7L,0L,6L)), + tf.newTuple(ImmutableList.of(20L,12L,21L,17L,17L,16L,7L,4L,4L)), + tf.newTuple(ImmutableList.of(21L,4L,1L,1L,18L,19L,7L,10L,1L)), + tf.newTuple(ImmutableList.of(22L,1L,7L,4L,19L,18L,7L,8L,0L)), + tf.newTuple(ImmutableList.of(23L,24L,14L,11L,20L,21L,8L,7L,9L)), + tf.newTuple(ImmutableList.of(24L,16L,25L,20L,21L,20L,8L,3L,5L)), + tf.newTuple(ImmutableList.of(25L,25L,27L,23L,22L,22L,9L,1L,9L)), + tf.newTuple(ImmutableList.of(26L,21L,8L,7L,23L,25L,9L,8L,8L)), + tf.newTuple(ImmutableList.of(27L,17L,4L,2L,24L,26L,9L,9L,6L)), + tf.newTuple(ImmutableList.of(28L,10L,8L,6L,25L,25L,9L,8L,4L)), + tf.newTuple(ImmutableList.of(29L,11L,15L,9L,25L,24L,9L,7L,4L)), + tf.newTuple(ImmutableList.of(30L,12L,23L,17L,25L,23L,9L,4L,4L)) + ); + + verifyExpected(data.get("result"), expected); } // See PIG-3726 @@ -151,8 +159,9 @@ public class TestRank3 { + "store A into 'empty_result' using mock.Storage();"; Util.registerMultiLineQuery(pigServer, query); - List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(new String[]{}); - Util.checkQueryOutputsAfterSort(data.get("empty_result"), expected); + + Multiset<Tuple> expected = ImmutableMultiset.of(); + verifyExpected(data.get("empty_result"), expected); } @Test @@ -188,4 +197,24 @@ public class TestRank3 { Util.checkQueryOutputsAfterSort(data.get("R4"), expectedResults); } + public void verifyExpected(List<Tuple> out, Multiset<Tuple> expected) { + Multiset<Tuple> resultMultiset = TreeMultiset.create(); + for (Tuple tup : out) { + resultMultiset.add(tup); + } + + StringBuilder error = new StringBuilder("Result does not match.\nActual result:\n"); + for (Tuple tup : resultMultiset.elementSet() ) { + error.append(tup).append(" x ").append(resultMultiset.count(tup)).append("\n"); + } + error.append("Expceted result:\n"); + for (Tuple tup : ImmutableSortedSet.copyOf(expected) ) { + error.append(tup).append(" x ").append(expected.count(tup)).append("\n"); + } + + //This one line test should be sufficient but adding the above + //for-loop for better error messages + assertTrue(error.toString(), resultMultiset.equals(expected)); + } + } Modified: pig/branches/spark/test/org/apache/pig/test/TestRegisteredJarVisibility.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestRegisteredJarVisibility.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestRegisteredJarVisibility.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestRegisteredJarVisibility.java Fri Mar 4 18:17:39 2016 @@ -24,12 +24,9 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.jar.JarFile; import java.util.jar.JarOutputStream; import java.util.zip.ZipEntry; @@ -38,17 +35,9 @@ import javax.tools.JavaFileObject; import javax.tools.StandardJavaFileManager; import javax.tools.ToolProvider; -import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.log4j.FileAppender; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.log4j.SimpleLayout; -import org.apache.pig.PigConfiguration; import org.apache.pig.PigServer; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; -import org.apache.pig.data.Tuple; import org.apache.pig.impl.util.JarManager; import org.junit.AfterClass; import org.junit.Assert; @@ -73,16 +62,15 @@ public class TestRegisteredJarVisibility private static MiniGenericCluster cluster; private static File jarFile; + private static File testDataDir; @BeforeClass() public static void setUp() throws IOException { String testResourcesDir = "test/resources/" + PACKAGE_NAME.replace(".", "/"); - String testBuildDataDir = "build/test/data"; // Create the test data directory if needed - File testDataDir = new File(testBuildDataDir, - TestRegisteredJarVisibility.class.getCanonicalName()); + testDataDir = new File(Util.getTestDirectory(TestRegisteredJarVisibility.class)); testDataDir.mkdirs(); jarFile = new File(testDataDir, JAR_FILE_NAME); @@ -111,6 +99,7 @@ public class TestRegisteredJarVisibility @AfterClass() public static void tearDown() { cluster.shutDown(); + Util.deleteDirectory(testDataDir); } @Before @@ -163,7 +152,7 @@ public class TestRegisteredJarVisibility // When jackson jar is not registered, jackson-core from the first jar in // classpath (pig.jar) should be picked up (version 1.8.8 in this case). String jacksonJar = JarManager.findContainingJar(org.codehaus.jackson.JsonParser.class); - Assert.assertTrue(new File(jacksonJar).getName().contains("1.8.8")); + Assert.assertTrue(new File(jacksonJar).getName().contains("1.9.13")); PigServer pigServer = new PigServer(Util.getLocalTestMode(), new Properties()); pigServer.registerJar("test/resources/jackson-core-asl-1.9.9.jar"); Modified: pig/branches/spark/test/org/apache/pig/test/TestScalarAliases.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestScalarAliases.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestScalarAliases.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestScalarAliases.java Fri Mar 4 18:17:39 2016 @@ -28,7 +28,6 @@ import java.util.Iterator; import org.apache.pig.PigServer; import org.apache.pig.data.Tuple; import org.junit.AfterClass; -import org.junit.Assume; import org.junit.Test; public class TestScalarAliases { @@ -93,7 +92,6 @@ public class TestScalarAliases { @Test public void testScalarErrMultipleRowsInInput() throws Exception{ - Assume.assumeTrue("Skip this test for TEZ. See PIG-3994", Util.isMapredExecType(cluster.getExecType())); Util.resetStateForExecModeSwitch(); pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); String[] input = { Modified: pig/branches/spark/test/org/apache/pig/test/TestScalarAliasesLocal.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestScalarAliasesLocal.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestScalarAliasesLocal.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestScalarAliasesLocal.java Fri Mar 4 18:17:39 2016 @@ -25,8 +25,10 @@ import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; import java.util.Iterator; +import java.util.List; import org.apache.pig.PigServer; +import org.apache.pig.builtin.mock.Storage; import org.apache.pig.data.BagFactory; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; @@ -556,4 +558,39 @@ public class TestScalarAliasesLocal { ); } + @Test + public void testScalarNullValue() throws Exception{ + Storage.Data data = Storage.resetData(pigServer); + data.set("input", Storage.tuple("a", 1), Storage.tuple("b", 2)); + + pigServer.setBatchOn(); + pigServer.registerQuery("A = load 'input' using mock.Storage() as (a:chararray, b:int);"); + pigServer.registerQuery("B = FILTER A by a == 'c';"); + pigServer.registerQuery("C = FOREACH A generate a, b + B.b;"); + pigServer.registerQuery("store C into 'output' using mock.Storage();"); + + pigServer.executeBatch(); + + List<Tuple> actualResults = data.get("output"); + List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[] {"('a', null)", "('b', null)"}); + Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults); + + } + + @Test + public void testScalarNullValue2() throws Exception{ + Storage.Data data = Storage.resetData(pigServer); + data.set("input", Storage.tuple("a", 1), Storage.tuple("b", 2)); + + pigServer.registerQuery("A = load 'input' using mock.Storage() as (a:chararray, b:int);"); + pigServer.registerQuery("B = FILTER A by a == 'c';"); + pigServer.registerQuery("C = GROUP B ALL;"); + pigServer.registerQuery("D = FOREACH C GENERATE COUNT(B.b) as count;"); + pigServer.registerQuery("E = FOREACH A GENERATE (D.count IS NOT NULL? D.count : 0l);;"); + + Iterator<Tuple> iter = pigServer.openIterator("E"); + Tuple t = iter.next(); + assertTrue(t.toString().equals("(0)")); + } } Modified: pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java Fri Mar 4 18:17:39 2016 @@ -442,6 +442,38 @@ public abstract class TestSecondarySort } + @Test + public void testNestedSortMultiQueryEndToEnd3() throws Exception { + File input1 = Util.createTempFileDelOnExit("test", "txt"); + PrintStream ps1 = new PrintStream(new FileOutputStream(input1)); + ps1.println("a\t0"); + ps1.println("a\t2"); + ps1.println("a\t1"); + ps1.close(); + Util.copyFromLocalToCluster(cluster, input1.getCanonicalPath(), "testNestedSortMultiQueryEndToEnd3-input-1.txt"); + + File input2 = Util.createTempFileDelOnExit("test", "txt"); + PrintStream ps2 = new PrintStream(new FileOutputStream(input2)); + ps2.println("a"); + ps2.close(); + Util.copyFromLocalToCluster(cluster, input2.getCanonicalPath(), "testNestedSortMultiQueryEndToEnd3-input-2.txt"); + + try { + pigServer.setBatchOn(); + pigServer.registerQuery("a = load 'testNestedSortMultiQueryEndToEnd3-input-1.txt' as (a0:chararray, a1:chararray);"); + pigServer.registerQuery("b = load 'testNestedSortMultiQueryEndToEnd3-input-2.txt' as (b0);"); + pigServer.registerQuery("c = cogroup b by b0, a by a0;"); + pigServer.registerQuery("d = foreach c {a_sorted = order a by a1 desc;generate group, a_sorted, b;}"); + Iterator<Tuple> iter = pigServer.openIterator("d"); + + assertEquals(iter.next().toString(), "(a,{(a,2),(a,1),(a,0)},{(a)})"); + } finally { + Util.deleteFile(cluster, "testNestedSortMultiQueryEndToEnd3-input-1.txt"); + Util.deleteFile(cluster, "testNestedSortMultiQueryEndToEnd3-input-2.txt"); + } + + } + // See PIG-1978 @Test public void testForEachTwoInput() throws Exception {
