Author: knoguchi Date: Fri Jan 25 21:51:40 2019 New Revision: 1852183 URL: http://svn.apache.org/viewvc?rev=1852183&view=rev Log: PIG-5372: SAMPLE/RANDOM(udf) before skewed join failing with NPE (knoguchi)
Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1852183&r1=1852182&r2=1852183&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Fri Jan 25 21:51:40 2019 @@ -87,6 +87,7 @@ PIG-5251: Bump joda-time to 2.9.9 (dbist OPTIMIZATIONS BUG FIXES +PIG-5372: SAMPLE/RANDOM(udf) before skewed join failing with NPE (knoguchi) PIG-5374: Use CircularFifoBuffer in InterRecordReader (szita) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=1852183&r1=1852182&r2=1852183&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Fri Jan 25 21:51:40 2019 @@ -112,8 +112,6 @@ public class SkewedPartitioner extends P @Override public void setConf(Configuration job) { conf = job; - PigMapReduce.sJobConfInternal.set(conf); - PigMapReduce.sJobConf = conf; } @Override Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1852183&r1=1852182&r2=1852183&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Fri Jan 25 21:51:40 2019 @@ -93,10 +93,10 @@ public class MapRedUtil { conf.set("yarn.resourcemanager.principal", mapConf.get("yarn.resourcemanager.principal")); } - if (PigMapReduce.sJobConfInternal.get().get("fs.file.impl")!=null) - conf.set("fs.file.impl", PigMapReduce.sJobConfInternal.get().get("fs.file.impl")); - if (PigMapReduce.sJobConfInternal.get().get("fs.hdfs.impl")!=null) - conf.set("fs.hdfs.impl", PigMapReduce.sJobConfInternal.get().get("fs.hdfs.impl")); + if (mapConf.get("fs.file.impl")!=null) + conf.set("fs.file.impl", mapConf.get("fs.file.impl")); + if (mapConf.get("fs.hdfs.impl")!=null) + conf.set("fs.hdfs.impl", mapConf.get("fs.hdfs.impl")); copyTmpFileConfigurationValues(PigMapReduce.sJobConfInternal.get(), conf); Modified: pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=1852183&r1=1852182&r2=1852183&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original) +++ pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Fri Jan 25 21:51:40 2019 @@ -207,7 +207,6 @@ public class TestSkewedJoin { assertEquals(0, count); } - @Test public void testSkewedJoinWithGroup() throws IOException{ pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);"); @@ -354,7 +353,7 @@ public class TestSkewedJoin { try { DataBag dbfrj = BagFactory.getInstance().newDefaultBag(); { - pigServer.registerQuery("C = join A by id, B by id using 'skewed';"); + pigServer.registerQuery("C = join A by id, B by id using 'skewed' parallel 2;"); Iterator<Tuple> iter = pigServer.openIterator("C"); while(iter.hasNext()) { @@ -375,7 +374,7 @@ public class TestSkewedJoin { pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' as (id,name);"); DataBag dbfrj = BagFactory.getInstance().newDefaultBag(); { - pigServer.registerQuery("C = join A by id left, B by id using 'skewed';"); + pigServer.registerQuery("C = join A by id left, B by id using 'skewed' parallel 2;"); Iterator<Tuple> iter = pigServer.openIterator("C"); while(iter.hasNext()) { @@ -383,7 +382,7 @@ public class TestSkewedJoin { } } { - pigServer.registerQuery("C = join A by id right, B by id using 'skewed';"); + pigServer.registerQuery("C = join A by id right, B by id using 'skewed' parallel 2;"); Iterator<Tuple> iter = pigServer.openIterator("C"); while(iter.hasNext()) { @@ -391,7 +390,7 @@ public class TestSkewedJoin { } } { - pigServer.registerQuery("C = join A by id full, B by id using 'skewed';"); + pigServer.registerQuery("C = join A by id full, B by id using 'skewed' parallel 2;"); Iterator<Tuple> iter = pigServer.openIterator("C"); while(iter.hasNext()) { @@ -413,7 +412,7 @@ public class TestSkewedJoin { DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj = BagFactory.getInstance().newDefaultBag(); { - pigServer.registerQuery("E = join C by id, D by id using 'skewed';"); + pigServer.registerQuery("E = join C by id, D by id using 'skewed' parallel 2;"); Iterator<Tuple> iter = pigServer.openIterator("E"); while(iter.hasNext()) { @@ -487,7 +486,7 @@ public class TestSkewedJoin { pigServer.registerQuery("a = load 'left.dat' as (nums:chararray);"); pigServer.registerQuery("b = load 'right.dat' as (number:chararray,text:chararray);"); pigServer.registerQuery("c = filter a by nums == '7';"); - pigServer.registerQuery("d = join c by nums LEFT OUTER, b by number USING 'skewed';"); + pigServer.registerQuery("d = join c by nums LEFT OUTER, b by number USING 'skewed' parallel 2;"); Iterator<Tuple> iter = pigServer.openIterator("d"); @@ -515,7 +514,7 @@ public class TestSkewedJoin { pigServer.registerQuery("a = load 'foo' as (nums:chararray);"); pigServer.registerQuery("b = load 'foo' as (nums:chararray);"); - pigServer.registerQuery("d = join a by nums, b by nums USING 'skewed';"); + pigServer.registerQuery("d = join a by nums, b by nums USING 'skewed' parallel 2;"); Iterator<Tuple> iter = pigServer.openIterator("d"); int count = 0; @@ -569,7 +568,7 @@ public class TestSkewedJoin { "exists = LOAD '" + INPUT_FILE2 + "' AS (a:long, x:chararray);" + "missing = LOAD '/non/existing/directory' AS (a:long);" + "missing = FOREACH ( GROUP missing BY a ) GENERATE $0 AS a, COUNT_STAR($1);" + - "joined = JOIN exists BY a, missing BY a USING 'skewed';"; + "joined = JOIN exists BY a, missing BY a USING 'skewed' parallel 2;"; String logFile = Util.createTempFileDelOnExit("tmp", ".log").getAbsolutePath(); Logger logger = Logger.getLogger("org.apache.pig"); @@ -619,4 +618,34 @@ public class TestSkewedJoin { } } + // PIG-5372 + @Test + public void testSkewedJoinWithRANDOMudf() throws IOException{ + pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);"); + pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);"); + pigServer.registerQuery("A2 = FOREACH A GENERATE id, RANDOM() as randnum;"); + + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.registerQuery("D = join A2 by id, B by id using 'skewed' parallel 2;"); + Iterator<Tuple> iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + dbfrj.add(iter.next()); + } + } + { + pigServer.registerQuery("D = join A2 by id, B by id;"); + Iterator<Tuple> iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + dbshj.add(iter.next()); + } + } + assertTrue(dbfrj.size()>0); + assertTrue(dbshj.size()>0); + assertEquals(dbfrj.size(), dbshj.size()); + } + + }