Modified: pig/branches/spark/test/org/apache/pig/test/TestCounters.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCounters.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestCounters.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestCounters.java Fri Feb 24 03:34:37 2017 @@ -30,17 +30,17 @@ import java.util.Map; import java.util.Random; import org.apache.hadoop.fs.Path; +import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.tools.pigstats.InputStats; import org.apache.pig.tools.pigstats.JobStats; -import org.apache.pig.tools.pigstats.OutputStats; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.PigStats.JobGraph; +import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; import org.junit.AfterClass; -import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -49,8 +49,8 @@ import org.junit.runners.JUnit4; public class TestCounters { String file = "input.txt"; - static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); - + static MiniCluster cluster = MiniCluster.buildCluster(); + final int MAX = 100*1000; Random r = new Random(); @@ -59,7 +59,7 @@ public class TestCounters { public static void oneTimeTearDown() throws Exception { cluster.shutDown(); } - + @Test public void testMapOnly() throws IOException, ExecException { int count = 0; @@ -70,13 +70,13 @@ public class TestCounters { if(t > 50) count ++; } pw.close(); - PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = filter a by $0 > 50;"); pigServer.registerQuery("c = foreach b generate $0 - 50;"); ExecJob job = pigServer.store("c", "output_map_only"); PigStats pigStats = job.getStatistics(); - + //counting the no. of bytes in the output file //long filesize = cluster.getFileSystem().getFileStatus(new Path("output_map_only")).getLen(); InputStream is = FileLocalizer.open(FileLocalizer.fullPath( @@ -85,9 +85,9 @@ public class TestCounters { long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); - + cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("output_map_only"), true); @@ -98,7 +98,7 @@ public class TestCounters { JobGraph jg = pigStats.getJobGraph(); Iterator<JobStats> iter = jg.iterator(); while (iter.hasNext()) { - JobStats js = iter.next(); + MRJobStats js = (MRJobStats) iter.next(); System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); @@ -123,20 +123,20 @@ public class TestCounters { count ++; } pw.close(); - PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = filter a by $0 > 50;"); pigServer.registerQuery("c = foreach b generate $0 - 50;"); ExecJob job = pigServer.store("c", "output_map_only", "BinStorage"); PigStats pigStats = job.getStatistics(); - + InputStream is = FileLocalizer.open(FileLocalizer.fullPath( "output_map_only", pigServer.getPigContext()), pigServer.getPigContext()); long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); cluster.getFileSystem().delete(new Path(file), true); @@ -149,8 +149,8 @@ public class TestCounters { JobGraph jp = pigStats.getJobGraph(); Iterator<JobStats> iter = jp.iterator(); while (iter.hasNext()) { - JobStats js = iter.next(); - + MRJobStats js = (MRJobStats) iter.next(); + System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); System.out.println("Map output records : " + js.getMapOutputRecords()); @@ -158,7 +158,7 @@ public class TestCounters { assertEquals(0, js.getReduceInputRecords()); assertEquals(0, js.getReduceOutputRecords()); } - + System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten()); assertEquals(filesize, pigStats.getBytesWritten()); } @@ -183,7 +183,7 @@ public class TestCounters { if(nos[i] > 0) count ++; } - PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = group a by $0;"); pigServer.registerQuery("c = foreach b generate group;"); @@ -195,7 +195,7 @@ public class TestCounters { long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); cluster.getFileSystem().delete(new Path(file), true); @@ -208,7 +208,7 @@ public class TestCounters { JobGraph jp = pigStats.getJobGraph(); Iterator<JobStats> iter = jp.iterator(); while (iter.hasNext()) { - JobStats js = iter.next(); + MRJobStats js = (MRJobStats) iter.next(); System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); System.out.println("Map output records : " + js.getMapOutputRecords()); @@ -242,7 +242,7 @@ public class TestCounters { if(nos[i] > 0) count ++; } - PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = group a by $0;"); pigServer.registerQuery("c = foreach b generate group;"); @@ -253,9 +253,9 @@ public class TestCounters { pigServer.getPigContext()), pigServer.getPigContext()); long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); - + cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("output"), true); @@ -266,7 +266,7 @@ public class TestCounters { JobGraph jp = pigStats.getJobGraph(); Iterator<JobStats> iter = jp.iterator(); while (iter.hasNext()) { - JobStats js = iter.next(); + MRJobStats js = (MRJobStats) iter.next(); System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); System.out.println("Map output records : " + js.getMapOutputRecords()); @@ -300,7 +300,7 @@ public class TestCounters { if(nos[i] > 0) count ++; } - PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = group a by $0;"); pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);"); @@ -311,20 +311,20 @@ public class TestCounters { pigServer.getPigContext()), pigServer.getPigContext()); long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); - + cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("output"), true); System.out.println("============================================"); System.out.println("Test case MapCombineReduce"); System.out.println("============================================"); - + JobGraph jp = pigStats.getJobGraph(); Iterator<JobStats> iter = jp.iterator(); while (iter.hasNext()) { - JobStats js = iter.next(); + MRJobStats js = (MRJobStats) iter.next(); System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); System.out.println("Map output records : " + js.getMapOutputRecords()); @@ -337,7 +337,7 @@ public class TestCounters { System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten()); assertEquals(filesize, pigStats.getBytesWritten()); } - + @Test public void testMapCombineReduceBinStorage() throws IOException, ExecException { int count = 0; @@ -358,20 +358,20 @@ public class TestCounters { if(nos[i] > 0) count ++; } - PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = group a by $0;"); pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);"); ExecJob job = pigServer.store("c", "output", "BinStorage"); PigStats pigStats = job.getStatistics(); - + InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), pigServer.getPigContext()); long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("output"), true); @@ -379,11 +379,11 @@ public class TestCounters { System.out.println("============================================"); System.out.println("Test case MapCombineReduce"); System.out.println("============================================"); - + JobGraph jp = pigStats.getJobGraph(); Iterator<JobStats> iter = jp.iterator(); while (iter.hasNext()) { - JobStats js = iter.next(); + MRJobStats js = (MRJobStats) iter.next(); System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); System.out.println("Map output records : " + js.getMapOutputRecords()); @@ -399,8 +399,6 @@ public class TestCounters { @Test public void testMultipleMRJobs() throws IOException, ExecException { - Assume.assumeTrue("Skip this test for TEZ. Assert is done only for first MR job", - Util.isMapredExecType(cluster.getExecType())); int count = 0; PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file)); int [] nos = new int[10]; @@ -415,38 +413,38 @@ public class TestCounters { } pw.close(); - for(int i = 0; i < 10; i++) { + for(int i = 0; i < 10; i++) { if(nos[i] > 0) count ++; } - PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = order a by $0;"); pigServer.registerQuery("c = group b by $0;"); pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);"); ExecJob job = pigServer.store("d", "output"); PigStats pigStats = job.getStatistics(); - + InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), pigServer.getPigContext()); long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); - + cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("output"), true); - + System.out.println("============================================"); System.out.println("Test case MultipleMRJobs"); System.out.println("============================================"); - + JobGraph jp = pigStats.getJobGraph(); - JobStats js = (JobStats)jp.getSinks().get(0); - + MRJobStats js = (MRJobStats)jp.getSinks().get(0); + System.out.println("Job id: " + js.getName()); System.out.println(jp.toString()); - + System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); System.out.println("Map output records : " + js.getMapOutputRecords()); @@ -455,12 +453,12 @@ public class TestCounters { assertEquals(count, js.getReduceInputRecords()); System.out.println("Reduce output records : " + js.getReduceOutputRecords()); assertEquals(count, js.getReduceOutputRecords()); - + System.out.println("Hdfs bytes written : " + js.getHdfsBytesWritten()); assertEquals(filesize, js.getHdfsBytesWritten()); } - + @Test public void testMapOnlyMultiQueryStores() throws Exception { PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file)); @@ -469,8 +467,8 @@ public class TestCounters { pw.println(t); } pw.close(); - - PigServer pigServer = new PigServer(cluster.getExecType(), + + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); pigServer.setBatchOn(); pigServer.registerQuery("a = load '" + file + "';"); @@ -481,22 +479,22 @@ public class TestCounters { List<ExecJob> jobs = pigServer.executeBatch(); PigStats stats = jobs.get(0).getStatistics(); assertTrue(stats.getOutputLocations().size() == 2); - + cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("/tmp/outout1"), true); cluster.getFileSystem().delete(new Path("/tmp/outout2"), true); - JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0); - + MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0); + Map<String, Long> entry = js.getMultiStoreCounters(); long counter = 0; for (Long val : entry.values()) { counter += val; } - - assertEquals(MAX, counter); - } - + + assertEquals(MAX, counter); + } + @Test public void testMultiQueryStores() throws Exception { int[] nums = new int[100]; @@ -507,13 +505,13 @@ public class TestCounters { nums[t]++; } pw.close(); - + int groups = 0; for (int i : nums) { if (i > 0) groups++; } - - PigServer pigServer = new PigServer(cluster.getExecType(), + + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); pigServer.setBatchOn(); pigServer.registerQuery("a = load '" + file + "';"); @@ -527,29 +525,29 @@ public class TestCounters { pigServer.registerQuery("store g into '/tmp/outout2';"); List<ExecJob> jobs = pigServer.executeBatch(); PigStats stats = jobs.get(0).getStatistics(); - + assertTrue(stats.getOutputLocations().size() == 2); - + cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("/tmp/outout1"), true); cluster.getFileSystem().delete(new Path("/tmp/outout2"), true); - JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0); - + MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0); + Map<String, Long> entry = js.getMultiStoreCounters(); long counter = 0; for (Long val : entry.values()) { counter += val; } - - assertEquals(groups, counter); - } - - /* + + assertEquals(groups, counter); + } + + /* * IMPORTANT NOTE: * COMMENTED OUT BECAUSE COUNTERS DO NOT CURRENTLY WORK IN LOCAL MODE - * SEE PIG-1286 - UNCOMMENT WHEN IT IS FIXED - */ + */ // @Test // public void testLocal() throws IOException, ExecException { // int count = 0; @@ -568,7 +566,7 @@ public class TestCounters { // } // pw.close(); // -// for(int i = 0; i < 10; i++) +// for(int i = 0; i < 10; i++) // if(nos[i] > 0) // count ++; // @@ -582,56 +580,56 @@ public class TestCounters { // pigServer.registerQuery("c = group b by $0;"); // pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);"); // PigStats pigStats = pigServer.store("d", "file://" + out.getAbsolutePath()).getStatistics(); -// InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), cluster.getExecType(), pigServer.getPigContext().getDfs()); +// InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs()); // long filesize = 0; // while(is.read() != -1) filesize++; -// +// // is.close(); // out.delete(); -// +// // //Map<String, Map<String, String>> stats = pigStats.getPigStats(); -// +// // assertEquals(10, pigStats.getRecordsWritten()); // assertEquals(110, pigStats.getBytesWritten()); // // } @Test - public void testJoinInputCounters() throws Exception { + public void testJoinInputCounters() throws Exception { testInputCounters("join"); } - + @Test - public void testCogroupInputCounters() throws Exception { + public void testCogroupInputCounters() throws Exception { testInputCounters("cogroup"); } - + @Test - public void testSkewedInputCounters() throws Exception { + public void testSkewedInputCounters() throws Exception { testInputCounters("skewed"); } - + @Test - public void testSelfJoinInputCounters() throws Exception { + public void testSelfJoinInputCounters() throws Exception { testInputCounters("self-join"); } - + private static boolean multiInputCreated = false; - + private static int count = 0; - - private void testInputCounters(String keyword) throws Exception { + + private void testInputCounters(String keyword) throws Exception { String file1 = "multi-input1.txt"; String file2 = "multi-input2.txt"; - + String output = keyword; - + if (keyword.equals("self-join")) { file2 = file1; keyword = "join"; } - - final int MAX_NUM_RECORDS = 100; + + final int MAX_NUM_RECORDS = 100; if (!multiInputCreated) { PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file1)); for (int i = 0; i < MAX_NUM_RECORDS; i++) { @@ -639,7 +637,7 @@ public class TestCounters { pw.println(t); } pw.close(); - + PrintWriter pw2 = new PrintWriter(Util.createInputFile(cluster, file2)); for (int i = 0; i < MAX_NUM_RECORDS; i++) { int t = r.nextInt(100); @@ -651,8 +649,8 @@ public class TestCounters { pw2.close(); multiInputCreated = true; } - - PigServer pigServer = new PigServer(cluster.getExecType(), + + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); pigServer.setBatchOn(); pigServer.registerQuery("a = load '" + file1 + "';"); @@ -663,7 +661,7 @@ public class TestCounters { pigServer.registerQuery("c = join a by $0, b by $0 using 'skewed';"); } ExecJob job = pigServer.store("c", output + "_output"); - + PigStats stats = job.getStatistics(); assertTrue(stats.isSuccessful()); List<InputStats> inputs = stats.getInputStats(); @@ -682,46 +680,4 @@ public class TestCounters { } } } - - @Test - public void testSplitUnionOutputCounters() throws Exception { - PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); - PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, "splitunion-input")); - for (int i = 0; i < 10; i++) { - pw.println(i); - } - pw.close(); - String query = - "a = load 'splitunion-input';" + - "split a into b if $0 < 5, c otherwise;" + - "d = union b, c;"; - - pigServer.registerQuery(query); - - ExecJob job = pigServer.store("d", "splitunion-output-0", "PigStorage"); - PigStats stats1 = job.getStatistics(); - - query = - "a = load 'splitunion-input';" + - "split a into b if $0 < 3, c if $0 > 2 and $0 < 6, d if $0 > 5;" + - "e = distinct d;" + - "f = union b, c, e;"; - - pigServer.registerQuery(query); - - job = pigServer.store("f", "splitunion-output-1", "PigStorage"); - PigStats stats2 = job.getStatistics(); - - PigStats[] pigStats = new PigStats[]{stats1, stats2}; - for (int i = 0; i < 2; i++) { - PigStats stats = pigStats[i]; - assertTrue(stats.isSuccessful()); - List<OutputStats> outputs = stats.getOutputStats(); - assertEquals(1, outputs.size()); - OutputStats output = outputs.get(0); - assertEquals("splitunion-output-" + i, output.getName()); - assertEquals(10, output.getNumberRecords()); - assertEquals(20, output.getBytes()); - } - } }
Modified: pig/branches/spark/test/org/apache/pig/test/TestDataBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestDataBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestDataBag.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestDataBag.java Fri Feb 24 03:34:37 2017 @@ -17,36 +17,17 @@ */ package org.apache.pig.test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; +import java.util.*; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.Iterator; -import java.util.PriorityQueue; -import java.util.Random; -import java.util.TreeSet; - -import org.apache.pig.data.BagFactory; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DefaultDataBag; -import org.apache.pig.data.DefaultTuple; -import org.apache.pig.data.DistinctDataBag; -import org.apache.pig.data.InternalCachedBag; -import org.apache.pig.data.InternalDistinctBag; -import org.apache.pig.data.InternalSortedBag; -import org.apache.pig.data.NonSpillableDataBag; -import org.apache.pig.data.SingleTupleBag; -import org.apache.pig.data.SortedDataBag; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; + + +import org.apache.pig.data.*; import org.apache.pig.impl.util.Spillable; import org.junit.After; import org.junit.Test; @@ -55,7 +36,7 @@ import org.junit.Test; /** * This class will exercise the basic Pig data model and members. It tests for proper behavior in * assignment and comparison, as well as function application. - * + * * @author dnm */ public class TestDataBag { @@ -609,7 +590,7 @@ public class TestDataBag { } mgr.forceSpill(); } - + assertEquals("Size of distinct data bag is incorrect", rightAnswer.size(), b.size()); // Read tuples back, hopefully they come out in the same order. @@ -738,14 +719,14 @@ public class TestDataBag { @Test public void testDefaultBagFactory() throws Exception { BagFactory f = BagFactory.getInstance(); - + DataBag bag = f.newDefaultBag(); DataBag sorted = f.newSortedBag(null); DataBag distinct = f.newDistinctBag(); assertTrue("Expected a default bag", (bag instanceof DefaultDataBag)); assertTrue("Expected a sorted bag", (sorted instanceof SortedDataBag)); - assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag)); + assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag)); } @Test @@ -775,7 +756,7 @@ public class TestDataBag { try { BagFactory f = BagFactory.getInstance(); } catch (RuntimeException re) { - assertEquals("Expected does not extend BagFactory message", + assertEquals("Expected does not extend BagFactory message", "Provided factory org.apache.pig.test.TestDataBag does not extend BagFactory!", re.getMessage()); caughtIt = true; @@ -794,7 +775,7 @@ public class TestDataBag { BagFactory.resetSelf(); } - + @Test public void testNonSpillableDataBagEquals1() throws Exception { String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} }; @@ -808,7 +789,7 @@ public class TestDataBag { } assertEquals(bg1, bg2); } - + @Test public void testNonSpillableDataBagEquals2() throws Exception { String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} }; @@ -823,7 +804,7 @@ public class TestDataBag { } assertEquals(bg1, bg2); } - + @Test public void testDefaultDataBagEquals1() throws Exception { String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} }; @@ -839,7 +820,7 @@ public class TestDataBag { } assertEquals(bg1, bg2); } - + @Test public void testDefaultDataBagEquals2() throws Exception { String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} }; @@ -856,35 +837,35 @@ public class TestDataBag { } assertEquals(bg1, bg2); } - - public void testInternalCachedBag() throws Exception { + + public void testInternalCachedBag() throws Exception { // check adding empty tuple DataBag bg0 = new InternalCachedBag(); bg0.add(TupleFactory.getInstance().newTuple()); bg0.add(TupleFactory.getInstance().newTuple()); assertEquals(bg0.size(), 2); - + // check equal of bags DataBag bg1 = new InternalCachedBag(1, 0.5f); assertEquals(bg1.size(), 0); - + String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} }; for (int i = 0; i < tupleContents.length; i++) { bg1.add(Util.createTuple(tupleContents[i])); } - + // check size, and isSorted(), isDistinct() assertEquals(bg1.size(), 3); assertFalse(bg1.isSorted()); assertFalse(bg1.isDistinct()); - + tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} }; DataBag bg2 = new InternalCachedBag(1, 0.5f); for (int i = 0; i < tupleContents.length; i++) { bg2.add(Util.createTuple(tupleContents[i])); } assertEquals(bg1, bg2); - + // check bag with data written to disk DataBag bg3 = new InternalCachedBag(1, 0.0f); tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}}; @@ -892,7 +873,7 @@ public class TestDataBag { bg3.add(Util.createTuple(tupleContents[i])); } assertEquals(bg1, bg3); - + // check iterator Iterator<Tuple> iter = bg3.iterator(); DataBag bg4 = new InternalCachedBag(1, 0.0f); @@ -900,7 +881,7 @@ public class TestDataBag { bg4.add(iter.next()); } assertEquals(bg3, bg4); - + // call iterator methods with irregular order iter = bg3.iterator(); assertTrue(iter.hasNext()); @@ -913,46 +894,46 @@ public class TestDataBag { assertFalse(iter.hasNext()); assertFalse(iter.hasNext()); assertEquals(bg3, bg5); - - + + bg4.clear(); - assertEquals(bg4.size(), 0); + assertEquals(bg4.size(), 0); } - - public void testInternalSortedBag() throws Exception { - + + public void testInternalSortedBag() throws Exception { + // check adding empty tuple DataBag bg0 = new InternalSortedBag(); bg0.add(TupleFactory.getInstance().newTuple()); bg0.add(TupleFactory.getInstance().newTuple()); assertEquals(bg0.size(), 2); - + // check equal of bags DataBag bg1 = new InternalSortedBag(); assertEquals(bg1.size(), 0); - + String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"c", "d" }}; for (int i = 0; i < tupleContents.length; i++) { bg1.add(Util.createTuple(tupleContents[i])); } - + // check size, and isSorted(), isDistinct() assertEquals(bg1.size(), 3); assertTrue(bg1.isSorted()); assertFalse(bg1.isDistinct()); - + tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} }; DataBag bg2 = new InternalSortedBag(); for (int i = 0; i < tupleContents.length; i++) { bg2.add(Util.createTuple(tupleContents[i])); } assertEquals(bg1, bg2); - + Iterator<Tuple> iter = bg1.iterator(); iter.next().equals(Util.createTuple(new String[] {"a", "b"})); iter.next().equals(Util.createTuple(new String[] {"c", "d"})); iter.next().equals(Util.createTuple(new String[] {"e", "f"})); - + // check bag with data written to disk DataBag bg3 = new InternalSortedBag(1, 0.0f, null); tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}}; @@ -960,17 +941,17 @@ public class TestDataBag { bg3.add(Util.createTuple(tupleContents[i])); } assertEquals(bg1, bg3); - + iter = bg3.iterator(); iter.next().equals(Util.createTuple(new String[] {"a", "b"})); iter.next().equals(Util.createTuple(new String[] {"c", "d"})); - iter.next().equals(Util.createTuple(new String[] {"e", "f"})); - + iter.next().equals(Util.createTuple(new String[] {"e", "f"})); + // call iterator methods with irregular order iter = bg3.iterator(); assertTrue(iter.hasNext()); assertTrue(iter.hasNext()); - + DataBag bg4 = new InternalSortedBag(1, 0.0f, null); bg4.add(iter.next()); bg4.add(iter.next()); @@ -978,21 +959,21 @@ public class TestDataBag { bg4.add(iter.next()); assertFalse(iter.hasNext()); assertFalse(iter.hasNext()); - assertEquals(bg3, bg4); - + assertEquals(bg3, bg4); + // check clear bg3.clear(); assertEquals(bg3.size(), 0); - + // test with all data spill out - DataBag bg5 = new InternalSortedBag(); + DataBag bg5 = new InternalSortedBag(); for(int j=0; j<3; j++) { for (int i = 0; i < tupleContents.length; i++) { bg5.add(Util.createTuple(tupleContents[i])); - } + } bg5.spill(); } - + assertEquals(bg5.size(), 9); iter = bg5.iterator(); for(int i=0; i<3; i++) { @@ -1002,21 +983,21 @@ public class TestDataBag { iter.next().equals(Util.createTuple(new String[] {"c", "d"})); } for(int i=0; i<3; i++) { - iter.next().equals(Util.createTuple(new String[] {"e", "f"})); + iter.next().equals(Util.createTuple(new String[] {"e", "f"})); } - + // test with most data spill out, with some data in memory // and merge of spill files - DataBag bg6 = new InternalSortedBag(); + DataBag bg6 = new InternalSortedBag(); for(int j=0; j<104; j++) { for (int i = 0; i < tupleContents.length; i++) { bg6.add(Util.createTuple(tupleContents[i])); - } + } if (j != 103) { bg6.spill(); } } - + assertEquals(bg6.size(), 104*3); iter = bg6.iterator(); for(int i=0; i<104; i++) { @@ -1026,55 +1007,55 @@ public class TestDataBag { iter.next().equals(Util.createTuple(new String[] {"c", "d"})); } for(int i=0; i<104; i++) { - iter.next().equals(Util.createTuple(new String[] {"e", "f"})); + iter.next().equals(Util.createTuple(new String[] {"e", "f"})); } - + // check two implementation of sorted bag can compare correctly - DataBag bg7 = new SortedDataBag(null); + DataBag bg7 = new SortedDataBag(null); for(int j=0; j<104; j++) { for (int i = 0; i < tupleContents.length; i++) { bg7.add(Util.createTuple(tupleContents[i])); - } + } if (j != 103) { bg7.spill(); } } assertEquals(bg6, bg7); } - - public void testInternalDistinctBag() throws Exception { + + public void testInternalDistinctBag() throws Exception { // check adding empty tuple DataBag bg0 = new InternalDistinctBag(); bg0.add(TupleFactory.getInstance().newTuple()); bg0.add(TupleFactory.getInstance().newTuple()); assertEquals(bg0.size(), 1); - + // check equal of bags DataBag bg1 = new InternalDistinctBag(); assertEquals(bg1.size(), 0); - + String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}}; for (int i = 0; i < tupleContents.length; i++) { bg1.add(Util.createTuple(tupleContents[i])); } - + // check size, and isSorted(), isDistinct() assertEquals(bg1.size(), 3); assertFalse(bg1.isSorted()); assertTrue(bg1.isDistinct()); - + tupleContents = new String[][] {{"a", "b" }, {"e", "d"}, {"e", "d"}, { "e", "f"} }; DataBag bg2 = new InternalDistinctBag(); for (int i = 0; i < tupleContents.length; i++) { bg2.add(Util.createTuple(tupleContents[i])); } assertEquals(bg1, bg2); - + Iterator<Tuple> iter = bg1.iterator(); iter.next().equals(Util.createTuple(new String[] {"a", "b"})); iter.next().equals(Util.createTuple(new String[] {"c", "d"})); iter.next().equals(Util.createTuple(new String[] {"e", "f"})); - + // check bag with data written to disk DataBag bg3 = new InternalDistinctBag(1, 0.0f); tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}}; @@ -1083,13 +1064,13 @@ public class TestDataBag { } assertEquals(bg2, bg3); assertEquals(bg3.size(), 3); - - + + // call iterator methods with irregular order iter = bg3.iterator(); assertTrue(iter.hasNext()); assertTrue(iter.hasNext()); - + DataBag bg4 = new InternalDistinctBag(1, 0.0f); bg4.add(iter.next()); bg4.add(iter.next()); @@ -1097,73 +1078,73 @@ public class TestDataBag { bg4.add(iter.next()); assertFalse(iter.hasNext()); assertFalse(iter.hasNext()); - assertEquals(bg3, bg4); - + assertEquals(bg3, bg4); + // check clear bg3.clear(); assertEquals(bg3.size(), 0); - + // test with all data spill out - DataBag bg5 = new InternalDistinctBag(); + DataBag bg5 = new InternalDistinctBag(); for(int j=0; j<3; j++) { for (int i = 0; i < tupleContents.length; i++) { bg5.add(Util.createTuple(tupleContents[i])); - } + } bg5.spill(); } - + assertEquals(bg5.size(), 3); - - + + // test with most data spill out, with some data in memory // and merge of spill files - DataBag bg6 = new InternalDistinctBag(); + DataBag bg6 = new InternalDistinctBag(); for(int j=0; j<104; j++) { for (int i = 0; i < tupleContents.length; i++) { bg6.add(Util.createTuple(tupleContents[i])); - } + } if (j != 103) { bg6.spill(); } } - - assertEquals(bg6.size(), 3); - + + assertEquals(bg6.size(), 3); + // check two implementation of sorted bag can compare correctly - DataBag bg7 = new DistinctDataBag(); + DataBag bg7 = new DistinctDataBag(); for(int j=0; j<104; j++) { for (int i = 0; i < tupleContents.length; i++) { bg7.add(Util.createTuple(tupleContents[i])); - } + } if (j != 103) { bg7.spill(); } } assertEquals(bg6, bg7); } - + // See PIG-1231 @Test public void testDataBagIterIdempotent() throws Exception { DataBag bg0 = new DefaultDataBag(); processDataBag(bg0, true); - + DataBag bg1 = new DistinctDataBag(); processDataBag(bg1, true); - + DataBag bg2 = new InternalDistinctBag(); processDataBag(bg2, true); - + DataBag bg3 = new InternalSortedBag(); processDataBag(bg3, true); - + DataBag bg4 = new SortedDataBag(null); processDataBag(bg4, true); - + DataBag bg5 = new InternalCachedBag(0, 0); processDataBag(bg5, false); } - + // See PIG-1285 @Test public void testSerializeSingleTupleBag() throws Exception { @@ -1178,7 +1159,7 @@ public class TestDataBag { dfBag.readFields(dis); assertTrue(dfBag.equals(stBag)); } - + // See PIG-2550 static class MyCustomTuple extends DefaultTuple { private static final long serialVersionUID = 8156382697467819543L; @@ -1203,23 +1184,7 @@ public class TestDataBag { Tuple t2 = iter.next(); assertTrue(t2.equals(t)); } - - // See PIG-4260 - @Test - public void testSpillArrayBackedList() throws Exception { - Tuple[] tuples = new Tuple[2]; - tuples[0] = TupleFactory.getInstance().newTuple(1); - tuples[0].set(0, "first"); - tuples[1] = TupleFactory.getInstance().newTuple(1); - tuples[1].set(0, "second"); - DefaultDataBag bag = new DefaultDataBag(Arrays.asList(tuples)); - bag.spill(); - Iterator<Tuple> iter = bag.iterator(); - assertEquals(tuples[0], iter.next()); - assertEquals(tuples[1], iter.next()); - assertFalse(iter.hasNext()); - } - + void processDataBag(DataBag bg, boolean doSpill) { Tuple t = TupleFactory.getInstance().newTuple(new Integer(0)); bg.add(t); @@ -1229,7 +1194,7 @@ public class TestDataBag { assertTrue(iter.hasNext()); iter.next(); assertFalse(iter.hasNext()); - assertFalse("hasNext should be idempotent", iter.hasNext()); + assertFalse("hasNext should be idempotent", iter.hasNext()); } } Modified: pig/branches/spark/test/org/apache/pig/test/TestDivide.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestDivide.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestDivide.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestDivide.java Fri Feb 24 03:34:37 2017 @@ -20,9 +20,6 @@ package org.apache.pig.test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import java.math.BigDecimal; -import java.math.MathContext; -import java.math.RoundingMode; import java.util.Map; import java.util.Random; @@ -56,7 +53,7 @@ public class TestDivide { public void testOperator() throws ExecException { // int TRIALS = 10; byte[] types = { DataType.BAG, DataType.BOOLEAN, DataType.BYTEARRAY, DataType.CHARARRAY, - DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, DataType.LONG, DataType.BIGDECIMAL, + DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, DataType.LONG, DataType.DATETIME, DataType.MAP, DataType.TUPLE }; // Map<Byte,String> map = GenRandomData.genTypeToNameMap(); System.out.println("Testing DIVIDE operator"); @@ -253,33 +250,6 @@ public class TestDivide { assertEquals(null, (Long)resl.result); break; } - case DataType.BIGDECIMAL: { - MathContext mc = new MathContext(Divide.BIGDECIMAL_MINIMAL_SCALE, RoundingMode.HALF_UP); - BigDecimal inpf1 = new BigDecimal(r.nextDouble(),mc); - BigDecimal inpf2 = new BigDecimal(r.nextDouble(),mc); - lt.setValue(inpf1); - rt.setValue(inpf2); - Result resf = op.getNextBigDecimal(); - BigDecimal expected = inpf1.divide(inpf2, 2 * Divide.BIGDECIMAL_MINIMAL_SCALE + 1, RoundingMode.HALF_UP); - assertEquals(expected, (BigDecimal)resf.result); - - // test with null in lhs - lt.setValue(null); - rt.setValue(inpf2); - resf = op.getNextBigDecimal(); - assertEquals(null, (BigDecimal)resf.result); - // test with null in rhs - lt.setValue(inpf1); - rt.setValue(null); - resf = op.getNextBigDecimal(); - assertEquals(null, (BigDecimal)resf.result); - // test divide by 0 - lt.setValue(inpf1); - rt.setValue(new BigDecimal(0.0f,mc)); - resf = op.getNextBigDecimal(); - assertEquals(null, (BigDecimal)resf.result); - break; - } case DataType.DATETIME: DateTime inpdt1 = new DateTime(r.nextLong()); DateTime inpdt2 = new DateTime(r.nextLong()); Modified: pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java Fri Feb 24 03:34:37 2017 @@ -23,13 +23,13 @@ import static org.junit.Assert.assertTru import java.io.File; import java.io.FileWriter; -import java.io.IOException; import java.io.PrintWriter; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.pig.PigRunner; +import org.apache.pig.tools.pigstats.JobStats; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; import org.junit.AfterClass; @@ -38,15 +38,16 @@ import org.junit.Test; public class TestEmptyInputDir { - private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); + private static MiniCluster cluster; private static final String EMPTY_DIR = "emptydir"; private static final String INPUT_FILE = "input"; private static final String OUTPUT_FILE = "output"; private static final String PIG_FILE = "test.pig"; - + @BeforeClass public static void setUpBeforeClass() throws Exception { + cluster = MiniCluster.buildCluster(); FileSystem fs = cluster.getFileSystem(); if (!fs.mkdirs(new Path(EMPTY_DIR))) { throw new Exception("failed to create empty dir"); @@ -63,35 +64,7 @@ public class TestEmptyInputDir { public static void tearDownAfterClass() throws Exception { cluster.shutDown(); } - - @Test - public void testGroupBy() throws Exception { - PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); - w.println("A = load '" + EMPTY_DIR + "';"); - w.println("B = group A by $0;"); - w.println("store B into '" + OUTPUT_FILE + "';"); - w.close(); - - try { - String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, }; - PigStats stats = PigRunner.run(args, null); - - assertTrue(stats.isSuccessful()); - - // This assert fails on 205 due to MAPREDUCE-3606 - if (Util.isMapredExecType(cluster.getExecType()) - && !Util.isHadoop205() && !Util.isHadoop1_x()) { - MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0); - assertEquals(0, js.getNumberMaps()); - } - - assertEmptyOutputFile(); - } finally { - new File(PIG_FILE).delete(); - Util.deleteFile(cluster, OUTPUT_FILE); - } - } - + @Test public void testSkewedJoin() throws Exception { PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); @@ -100,28 +73,31 @@ public class TestEmptyInputDir { w.println("C = join B by $0, A by $0 using 'skewed';"); w.println("store C into '" + OUTPUT_FILE + "';"); w.close(); - + try { - String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, }; + String[] args = { PIG_FILE }; PigStats stats = PigRunner.run(args, null); - + assertTrue(stats.isSuccessful()); - + // the sampler job has zero maps + MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0); + // This assert fails on 205 due to MAPREDUCE-3606 - if (Util.isMapredExecType(cluster.getExecType()) - && !Util.isHadoop205() && !Util.isHadoop1_x()) { - // the sampler job has zero maps - MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0); - assertEquals(0, js.getNumberMaps()); - } - - assertEmptyOutputFile(); + if (!Util.isHadoop205()&&!Util.isHadoop1_x()) + assertEquals(0, js.getNumberMaps()); + + FileSystem fs = cluster.getFileSystem(); + FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE)); + assertTrue(status.isDir()); + assertEquals(0, status.getLen()); + // output directory isn't empty + assertTrue(fs.listStatus(status.getPath()).length > 0); } finally { new File(PIG_FILE).delete(); Util.deleteFile(cluster, OUTPUT_FILE); } } - + @Test public void testMergeJoin() throws Exception { PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); @@ -130,28 +106,32 @@ public class TestEmptyInputDir { w.println("C = join A by $0, B by $0 using 'merge';"); w.println("store C into '" + OUTPUT_FILE + "';"); w.close(); - + try { - String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, }; + String[] args = { PIG_FILE }; PigStats stats = PigRunner.run(args, null); - - assertTrue(stats.isSuccessful()); - + + assertTrue(stats.isSuccessful()); + // the indexer job has zero maps + MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0); + // This assert fails on 205 due to MAPREDUCE-3606 - if (Util.isMapredExecType(cluster.getExecType()) - && !Util.isHadoop205() && !Util.isHadoop1_x()) { - // the indexer job has zero maps - MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0); - assertEquals(0, js.getNumberMaps()); - } - - assertEmptyOutputFile(); + if (!Util.isHadoop205()&&!Util.isHadoop1_x()) + assertEquals(0, js.getNumberMaps()); + + FileSystem fs = cluster.getFileSystem(); + FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE)); + assertTrue(status.isDir()); + assertEquals(0, status.getLen()); + + // output directory isn't empty + assertTrue(fs.listStatus(status.getPath()).length > 0); } finally { new File(PIG_FILE).delete(); Util.deleteFile(cluster, OUTPUT_FILE); } } - + @Test public void testFRJoin() throws Exception { PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); @@ -160,44 +140,55 @@ public class TestEmptyInputDir { w.println("C = join A by $0, B by $0 using 'repl';"); w.println("store C into '" + OUTPUT_FILE + "';"); w.close(); - + try { - String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, }; + String[] args = { PIG_FILE }; PigStats stats = PigRunner.run(args, null); - - assertTrue(stats.isSuccessful()); - + + assertTrue(stats.isSuccessful()); + // the indexer job has zero maps + MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0); + // This assert fails on 205 due to MAPREDUCE-3606 - if (Util.isMapredExecType(cluster.getExecType()) - && !Util.isHadoop205() && !Util.isHadoop1_x()) { - MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0); - assertEquals(0, js.getNumberMaps()); - } - - assertEmptyOutputFile(); + if (!Util.isHadoop205()&&!Util.isHadoop1_x()) + assertEquals(0, js.getNumberMaps()); + + FileSystem fs = cluster.getFileSystem(); + FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE)); + assertTrue(status.isDir()); + assertEquals(0, status.getLen()); + + // output directory isn't empty + assertTrue(fs.listStatus(status.getPath()).length > 0); } finally { new File(PIG_FILE).delete(); Util.deleteFile(cluster, OUTPUT_FILE); } } - + @Test public void testRegularJoin() throws Exception { PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); w.println("A = load '" + INPUT_FILE + "';"); w.println("B = load '" + EMPTY_DIR + "';"); - w.println("C = join B by $0, A by $0 PARALLEL 0;"); + w.println("C = join B by $0, A by $0;"); w.println("store C into '" + OUTPUT_FILE + "';"); w.close(); - + try { - String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, }; + String[] args = { PIG_FILE }; PigStats stats = PigRunner.run(args, null); - - assertTrue(stats.isSuccessful()); - - assertEmptyOutputFile(); - + + assertTrue(stats.isSuccessful()); + + FileSystem fs = cluster.getFileSystem(); + FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE)); + assertTrue(status.isDir()); + assertEquals(0, status.getLen()); + + // output directory isn't empty + assertTrue(fs.listStatus(status.getPath()).length > 0); + } finally { new File(PIG_FILE).delete(); Util.deleteFile(cluster, OUTPUT_FILE); @@ -212,19 +203,19 @@ public class TestEmptyInputDir { w.println("C = join B by $0 right outer, A by $0;"); w.println("store C into '" + OUTPUT_FILE + "';"); w.close(); - + try { - String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, }; + String[] args = { PIG_FILE }; PigStats stats = PigRunner.run(args, null); - - assertTrue(stats.isSuccessful()); - assertEquals(2, stats.getNumberRecords(OUTPUT_FILE)); + + assertTrue(stats.isSuccessful()); + assertEquals(2, stats.getNumberRecords(OUTPUT_FILE)); } finally { new File(PIG_FILE).delete(); Util.deleteFile(cluster, OUTPUT_FILE); } } - + @Test public void testLeftOuterJoin() throws Exception { PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); @@ -233,88 +224,16 @@ public class TestEmptyInputDir { w.println("C = join B by $0 left outer, A by $0;"); w.println("store C into '" + OUTPUT_FILE + "';"); w.close(); - - try { - String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, }; - PigStats stats = PigRunner.run(args, null); - - assertTrue(stats.isSuccessful()); - assertEquals(0, stats.getNumberRecords(OUTPUT_FILE)); - } finally { - new File(PIG_FILE).delete(); - Util.deleteFile(cluster, OUTPUT_FILE); - } - } - - @Test - public void testBloomJoin() throws Exception { - PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); - w.println("A = load '" + INPUT_FILE + "' as (x:int);"); - w.println("B = load '" + EMPTY_DIR + "' as (x:int);"); - w.println("C = join B by $0, A by $0 using 'bloom';"); - w.println("D = join A by $0, B by $0 using 'bloom';"); - w.println("store C into '" + OUTPUT_FILE + "';"); - w.println("store D into 'output1';"); - w.close(); - - try { - String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, }; - PigStats stats = PigRunner.run(args, null); - - assertTrue(stats.isSuccessful()); - assertEquals(0, stats.getNumberRecords(OUTPUT_FILE)); - assertEquals(0, stats.getNumberRecords("output1")); - assertEmptyOutputFile(); - } finally { - new File(PIG_FILE).delete(); - Util.deleteFile(cluster, OUTPUT_FILE); - Util.deleteFile(cluster, "output1"); - } - } - - @Test - public void testBloomJoinOuter() throws Exception { - PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); - w.println("A = load '" + INPUT_FILE + "' as (x:int);"); - w.println("B = load '" + EMPTY_DIR + "' as (x:int);"); - w.println("C = join B by $0 left outer, A by $0 using 'bloom';"); - w.println("D = join A by $0 left outer, B by $0 using 'bloom';"); - w.println("E = join B by $0 right outer, A by $0 using 'bloom';"); - w.println("F = join A by $0 right outer, B by $0 using 'bloom';"); - w.println("store C into '" + OUTPUT_FILE + "';"); - w.println("store D into 'output1';"); - w.println("store E into 'output2';"); - w.println("store F into 'output3';"); - w.close(); - + try { - String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, }; + String[] args = { PIG_FILE }; PigStats stats = PigRunner.run(args, null); - - assertTrue(stats.isSuccessful()); - assertEquals(0, stats.getNumberRecords(OUTPUT_FILE)); - assertEquals(2, stats.getNumberRecords("output1")); - assertEquals(2, stats.getNumberRecords("output2")); - assertEquals(0, stats.getNumberRecords("output3")); - assertEmptyOutputFile(); + + assertTrue(stats.isSuccessful()); + assertEquals(0, stats.getNumberRecords(OUTPUT_FILE)); } finally { new File(PIG_FILE).delete(); Util.deleteFile(cluster, OUTPUT_FILE); - Util.deleteFile(cluster, "output1"); - Util.deleteFile(cluster, "output2"); - Util.deleteFile(cluster, "output3"); } } - - private void assertEmptyOutputFile() throws IllegalArgumentException, IOException { - FileSystem fs = cluster.getFileSystem(); - FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE)); - assertTrue(status.isDir()); - assertEquals(0, status.getLen()); - // output directory isn't empty. Has one empty file - FileStatus[] files = fs.listStatus(status.getPath(), Util.getSuccessMarkerPathFilter()); - assertEquals(1, files.length); - assertEquals(0, files[0].getLen()); - assertTrue(files[0].getPath().getName().startsWith("part-")); - } } Modified: pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java Fri Feb 24 03:34:37 2017 @@ -200,11 +200,11 @@ public class TestErrorHandlingStoreFunc private void updatePigProperties(boolean allowErrors, long minErrors, double errorThreshold) { Properties properties = pigServer.getPigContext().getProperties(); - properties.put(PigConfiguration.PIG_ERROR_HANDLING_ENABLED, + properties.put(PigConfiguration.PIG_ALLOW_STORE_ERRORS, Boolean.toString(allowErrors)); - properties.put(PigConfiguration.PIG_ERROR_HANDLING_MIN_ERROR_RECORDS, + properties.put(PigConfiguration.PIG_ERRORS_MIN_RECORDS, Long.toString(minErrors)); - properties.put(PigConfiguration.PIG_ERROR_HANDLING_THRESHOLD_PERCENT, + properties.put(PigConfiguration.PIG_ERROR_THRESHOLD_PERCENT, Double.toString(errorThreshold)); } } Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java Fri Feb 24 03:34:37 2017 @@ -291,7 +291,7 @@ public class TestEvalPipeline { myMap.put("long", new Long(1)); myMap.put("float", new Float(1.0)); myMap.put("double", new Double(1.0)); - myMap.put("dba", new DataByteArray(new String("1234").getBytes())); + myMap.put("dba", new DataByteArray(new String("bytes").getBytes())); myMap.put("map", mapInMap); myMap.put("tuple", tuple); myMap.put("bag", bag); @@ -794,31 +794,32 @@ public class TestEvalPipeline { } @Test - public void testMapUDFWithImplicitTypeCast() throws Exception{ + public void testMapUDFfail() throws Exception{ int LOOP_COUNT = 2; File tmpFile = Util.createTempFileDelOnExit("test", "txt"); PrintStream ps = new PrintStream(new FileOutputStream(tmpFile)); for(int i = 0; i < LOOP_COUNT; i++) { - ps.println(i); + for(int j=0;j<LOOP_COUNT;j+=2){ + ps.println(i+"\t"+j); + ps.println(i+"\t"+j); + } } ps.close(); pigServer.registerQuery("A = LOAD '" + Util.generateURI(tmpFile.toString(), pigContext) + "';"); pigServer.registerQuery("B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;"); //the argument does not matter - String query = "C = foreach B generate mymap#'dba' * 10; "; + String query = "C = foreach B {" + + "generate mymap#'dba' * 10;" + + "};"; pigServer.registerQuery(query); - - Iterator<Tuple> iter = pigServer.openIterator("C"); - if(!iter.hasNext()) Assert.fail("No output found"); - int numIdentity = 0; - while(iter.hasNext()){ - Tuple t = iter.next(); - Assert.assertEquals(new Integer(12340), (Integer)t.get(0)); - ++numIdentity; + try { + pigServer.openIterator("C"); + Assert.fail("Error expected."); + } catch (Exception e) { + e.getMessage().contains("Cannot determine"); } - Assert.assertEquals(LOOP_COUNT, numIdentity); } @Test Modified: pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java Fri Feb 24 03:34:37 2017 @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.InternalMap; @@ -37,10 +38,10 @@ import org.apache.pig.impl.builtin.FindQ import org.junit.Test; public class TestFindQuantiles { - + private static TupleFactory tFact = TupleFactory.getInstance(); private static final float epsilon = 0.0001f; - + @Test public void testFindQuantiles() throws Exception { final int numSamples = 97778; @@ -49,7 +50,7 @@ public class TestFindQuantiles { System.out.println("sum: " + sum); assertTrue(sum > (1-epsilon) && sum < (1+epsilon)); } - + @Test public void testFindQuantiles2() throws Exception { final int numSamples = 30000; @@ -85,7 +86,7 @@ public class TestFindQuantiles { } private float[] getProbVec(Tuple values) throws Exception { - float[] probVec = new float[values.size()]; + float[] probVec = new float[values.size()]; for(int i = 0; i < values.size(); i++) { probVec[i] = (Float)values.get(i); } @@ -94,7 +95,7 @@ public class TestFindQuantiles { private DataBag generateRandomSortedSamples(int numSamples, int max) throws Exception { Random rand = new Random(1000); - List<Tuple> samples = new ArrayList<Tuple>(); + List<Tuple> samples = new ArrayList<Tuple>(); for (int i=0; i<numSamples; i++) { Tuple t = tFact.newTuple(1); t.set(0, rand.nextInt(max)); @@ -105,7 +106,7 @@ public class TestFindQuantiles { } private DataBag generateUniqueSamples(int numSamples) throws Exception { - DataBag samples = BagFactory.getInstance().newDefaultBag(); + DataBag samples = BagFactory.getInstance().newDefaultBag(); for (int i=0; i<numSamples; i++) { Tuple t = tFact.newTuple(1); t.set(0, new Integer(23)); @@ -120,9 +121,9 @@ public class TestFindQuantiles { in.set(0, new Integer(numReduceres)); in.set(1, samples); - + FindQuantiles fq = new FindQuantiles(); - + Map<String, Object> res = fq.exec(in); return res; } @@ -134,11 +135,12 @@ public class TestFindQuantiles { InternalMap weightedPartsData = (InternalMap) res.get(FindQuantiles.WEIGHTED_PARTS); Iterator<Object> it = weightedPartsData.values().iterator(); float[] probVec = getProbVec((Tuple)it.next()); + new DiscreteProbabilitySampleGenerator(probVec); float sum = 0.0f; for (float f : probVec) { sum += f; } return sum; } - + } Modified: pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Fri Feb 24 03:34:37 2017 @@ -30,7 +30,6 @@ import java.util.List; import java.util.Random; import org.apache.pig.PigServer; -import org.apache.pig.builtin.mock.Storage; import org.apache.pig.data.Tuple; import org.apache.pig.test.utils.TestHelper; import org.junit.Test; @@ -106,31 +105,6 @@ public class TestForEachNestedPlanLocal } @Test - public void testNestedCrossTwoRelationsLimit() throws Exception { - Storage.Data data = Storage.resetData(pig); - data.set("input", - Storage.tuple(Storage.bag(Storage.tuple(1, 1), Storage.tuple(1, 2)), Storage.bag(Storage.tuple(1, 3), Storage.tuple(1, 4))), - Storage.tuple(Storage.bag(Storage.tuple(2, 1), Storage.tuple(2, 2)), Storage.bag(Storage.tuple(2, 3))), - Storage.tuple(Storage.bag(Storage.tuple(3, 1)), Storage.bag(Storage.tuple(3, 2)))); - - pig.setBatchOn(); - pig.registerQuery("A = load 'input' using mock.Storage() as (bag1:bag{tup1:tuple(f1:int, f2:int)}, bag2:bag{tup2:tuple(f3:int, f4:int)});"); - pig.registerQuery("B = foreach A {" - + "crossed = cross bag1, bag2;" - + "filtered = filter crossed by f1 == f3;" - + "lmt = limit filtered 1;" - + "generate FLATTEN(lmt);" + "}"); - pig.registerQuery("store B into 'output' using mock.Storage();"); - - pig.executeBatch(); - - List<Tuple> actualResults = data.get("output"); - List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( - new String[] {"(1, 1, 1, 3)", "(2, 1, 2, 3)", "(3, 1, 3, 2)"}); - Util.checkQueryOutputs(actualResults.iterator(), expectedResults); - } - - @Test public void testNestedCrossTwoRelationsComplex() throws Exception { File[] tmpFiles = generateDataSetFilesForNestedCross(); List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStringAsByteArray(new String[] { Modified: pig/branches/spark/test/org/apache/pig/test/TestGFCross.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGFCross.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestGFCross.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestGFCross.java Fri Feb 24 03:34:37 2017 @@ -20,7 +20,6 @@ package org.apache.pig.test; import static org.junit.Assert.assertEquals; import org.apache.hadoop.conf.Configuration; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; @@ -51,7 +50,6 @@ public class TestGFCross { public void testSerial() throws Exception { Configuration cfg = new Configuration(); cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "1"); - cfg.set(MRConfiguration.TASK_ID, "task_1473802673416_1808_m_000000"); UDFContext.getUDFContext().addJobConf(cfg); Tuple t = TupleFactory.getInstance().newTuple(2); @@ -68,7 +66,6 @@ public class TestGFCross { public void testParallelSet() throws Exception { Configuration cfg = new Configuration(); cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "10"); - cfg.set(MRConfiguration.TASK_ID, "task_14738102975522_0001_r_000000"); UDFContext.getUDFContext().addJobConf(cfg); Tuple t = TupleFactory.getInstance().newTuple(2); Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Fri Feb 24 03:34:37 2017 @@ -28,7 +28,6 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileReader; import java.io.FileWriter; -import java.io.FilenameFilter; import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintWriter; @@ -971,6 +970,7 @@ public class TestGrunt { @Test public void testStopOnFailure() throws Throwable { + Assume.assumeTrue("Skip this test for TEZ", Util.isMapredExecType(cluster.getExecType())); PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); PigContext context = server.getPigContext(); context.getProperties().setProperty("stop.on.failure", ""+true); @@ -1569,20 +1569,4 @@ public class TestGrunt { } assertTrue(found); } - - @Test - public void testGruntUtf8() throws Throwable { - String command = "mkdir æµè¯\n" + - "quit\n"; - System.setProperty("jline.WindowsTerminal.directConsole", "false"); - System.setIn(new ByteArrayInputStream(command.getBytes())); - org.apache.pig.PigRunner.run(new String[] {"-x", "local"}, null); - File[] partFiles = new File(".").listFiles(new FilenameFilter() { - public boolean accept(File dir, String name) { - return name.equals("æµè¯"); - } - }); - assertEquals(partFiles.length, 1); - new File("æµè¯").delete(); - } } Modified: pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java Fri Feb 24 03:34:37 2017 @@ -71,16 +71,12 @@ public class TestHBaseStorage { private static final String TESTTABLE_1 = "pigtable_1"; private static final String TESTTABLE_2 = "pigtable_2"; private static final byte[] COLUMNFAMILY = Bytes.toBytes("pig"); - private static final byte[] COLUMNFAMILY2 = Bytes.toBytes("pig2"); private static final String TESTCOLUMN_A = "pig:col_a"; private static final String TESTCOLUMN_B = "pig:col_b"; private static final String TESTCOLUMN_C = "pig:col_c"; private static final int TEST_ROW_COUNT = 100; - private enum TableType {ONE_CF, TWO_CF}; - private TableType lastTableType; - @BeforeClass public static void setUp() throws Exception { // This is needed by Pig @@ -317,13 +313,13 @@ public class TestHBaseStorage { */ @Test public void testLoadWithMap_3_col_prefix() throws IOException { - prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText, TableType.TWO_CF); + prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText); pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using " + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" - + "pig2:* pig:prefixed_col_*" + + "pig:col_* pig:prefixed_col_*" + "','-loadKey') as (rowKey:chararray, pig_cf_map:map[], pig_prefix_cf_map:map[]);"); Iterator<Tuple> it = pig.openIterator("a"); int count = 0; @@ -332,18 +328,24 @@ public class TestHBaseStorage { Tuple t = it.next(); LOG.info("LoadFromHBase " + t); String rowKey = t.get(0).toString(); - Map pig_secondery_cf_map = (Map) t.get(1); + Map pig_cf_map = (Map) t.get(1); Map pig_prefix_cf_map = (Map) t.get(2); Assert.assertEquals(3, t.size()); Assert.assertEquals("00".substring((count + "").length()) + count, rowKey); - Assert.assertEquals(count, - Integer.parseInt(pig_secondery_cf_map.get("col_x").toString())); Assert.assertEquals("PrefixedText_" + count, ((DataByteArray) pig_prefix_cf_map.get("prefixed_col_d")).toString()); Assert.assertEquals(1, pig_prefix_cf_map.size()); + Assert.assertEquals(count, + Integer.parseInt(pig_cf_map.get("col_a").toString())); + Assert.assertEquals(count + 0.0, + Double.parseDouble(pig_cf_map.get("col_b").toString()), 1e-6); + Assert.assertEquals("Text_" + count, + ((DataByteArray) pig_cf_map.get("col_c")).toString()); + Assert.assertEquals(3, pig_cf_map.size()); + count++; } Assert.assertEquals(TEST_ROW_COUNT, count); @@ -432,39 +434,6 @@ public class TestHBaseStorage { LOG.info("LoadFromHBase done"); } - public void testLoadWithFixedAndPrefixedCols3() throws IOException { - prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText); - - pig.registerQuery("a = load 'hbase://" - + TESTTABLE_1 - + "' using " - + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" - + "pig:* pig:prefixed_col_*" - + "','-loadKey') as (rowKey:chararray, pig_cf_map:map[], pig_prefix_cf_map:map[]);"); - Iterator<Tuple> it = pig.openIterator("a"); - int count = 0; - LOG.info("LoadFromHBase Starting"); - while (it.hasNext()) { - Tuple t = it.next(); - LOG.info("LoadFromHBase " + t); - String rowKey = (String) t.get(0); - Map pig_cf_map = (Map) t.get(1); - Map pig_prefix_cf_map = (Map) t.get(2); - Assert.assertEquals(3, t.size()); - - Assert.assertEquals("00".substring((count + "").length()) + count, - rowKey); - Assert.assertEquals("PrefixedText_" + count, - ((DataByteArray) pig_cf_map.get("prefixed_col_d")).toString()); - Assert.assertEquals(1, pig_cf_map.size()); - Assert.assertEquals(1, pig_prefix_cf_map.size()); - - count++; - } - Assert.assertEquals(TEST_ROW_COUNT, count); - LOG.info("LoadFromHBase done"); - } - /** * * Test Load from hbase with map parameters and with a * static column in different order @@ -1517,36 +1486,22 @@ public class TestHBaseStorage { + "') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);"); } - private HTable prepareTable(String tableName, boolean initData, - DataFormat format) throws IOException { - return prepareTable(tableName, initData, format, TableType.ONE_CF); - } /** * Prepare a table in hbase for testing. * */ private HTable prepareTable(String tableName, boolean initData, - DataFormat format, TableType type) throws IOException { + DataFormat format) throws IOException { // define the table schema HTable table = null; try { - if (lastTableType == type) { - deleteAllRows(tableName); - } else { - util.deleteTable(tableName); - } + deleteAllRows(tableName); } catch (Exception e) { // It's ok, table might not exist. } try { - if (type == TableType.TWO_CF) { - table = util.createTable(Bytes.toBytesBinary(tableName), - new byte[][]{COLUMNFAMILY, COLUMNFAMILY2}); - } else { - table = util.createTable(Bytes.toBytesBinary(tableName), - COLUMNFAMILY); - } - lastTableType = type; + table = util.createTable(Bytes.toBytesBinary(tableName), + COLUMNFAMILY); } catch (Exception e) { table = new HTable(conf, Bytes.toBytesBinary(tableName)); } @@ -1573,11 +1528,6 @@ public class TestHBaseStorage { // prefixed_col_d: string type put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"), Bytes.toBytes("PrefixedText_" + i)); - // another cf - if (type == TableType.TWO_CF) { - put.add(COLUMNFAMILY2, Bytes.toBytes("col_x"), - Bytes.toBytes(i)); - } table.put(put); } else { // row key: string type @@ -1598,11 +1548,6 @@ public class TestHBaseStorage { // prefixed_col_d: string type put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"), ("PrefixedText_" + i).getBytes()); - // another cf - if (type == TableType.TWO_CF) { - put.add(COLUMNFAMILY2, Bytes.toBytes("col_x"), - (i + "").getBytes()); - } table.put(put); } } Modified: pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java Fri Feb 24 03:34:37 2017 @@ -63,6 +63,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.builtin.PigStorage; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileSpec; @@ -130,7 +131,7 @@ public class TestJobControlCompiler { // verifying the jar gets on distributed cache Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf); // guava jar is not shipped with Hadoop 2.x - Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 5, fileClassPaths.length); + Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), HadoopShims.isHadoopYARN() ? 5 : 6, fileClassPaths.length); Path distributedCachePath = fileClassPaths[0]; Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName()); // hadoop bug requires path to not contain hdfs://hotname in front @@ -234,12 +235,22 @@ public class TestJobControlCompiler { // 4. another.jar and 5. udf1.jar, and not duplicate udf.jar System.out.println("cache.files= " + Arrays.toString(cacheURIs)); System.out.println("classpath.files= " + Arrays.toString(fileClassPaths)); - // Default jars - 5 (pig, antlr, joda-time, automaton) - // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar - Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9, - Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size()); - Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9, - Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size()); + if (HadoopShims.isHadoopYARN()) { + // Default jars - 5 (pig, antlr, joda-time, automaton) + // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar + Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9, + Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size()); + Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9, + Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size()); + } else { + // Default jars - 5. Has guava in addition + // There will be same entries duplicated for udf.jar and udf2.jar + Assert.assertEquals("size 12 for " + Arrays.toString(cacheURIs), 12, + Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size()); + Assert.assertEquals("size 12 for " + Arrays.toString(fileClassPaths), 12, + Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size()); + } + // Count occurrences of the resources Map<String, Integer> occurrences = new HashMap<String, Integer>(); @@ -248,12 +259,22 @@ public class TestJobControlCompiler { val = (val == null) ? 1 : ++val; occurrences.put(cacheURI.toString(), val); } - Assert.assertEquals(9, occurrences.size()); + if (HadoopShims.isHadoopYARN()) { + Assert.assertEquals(9, occurrences.size()); + } else { + Assert.assertEquals(10, occurrences.size()); //guava jar in addition + } for (String file : occurrences.keySet()) { - // check that only single occurrence even though we added once to dist cache (simulating via Oozie) - // and second time through pig register jar when there is symlink - Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file)); + if (!HadoopShims.isHadoopYARN() && (file.endsWith("udf.jar") || file.endsWith("udf2.jar"))) { + // Same path added twice which is ok. It should not be a shipped to hdfs temp path. + // We assert path is same by checking count + Assert.assertEquals("Two occurrences for " + file, 2, (int) occurrences.get(file)); + } else { + // check that only single occurrence even though we added once to dist cache (simulating via Oozie) + // and second time through pig register jar when there is symlink + Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file)); + } } }
