Author: rohini
Date: Sat Jan 7 20:18:04 2017
New Revision: 1777853
URL: http://svn.apache.org/viewvc?rev=1777853&view=rev
Log:
PIG-4930: Skewed Join Breaks On Empty Sampled Input When Key is From Map
(nkollar via rohini)
Modified:
pig/branches/branch-0.16/CHANGES.txt
pig/branches/branch-0.16/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
pig/branches/branch-0.16/test/org/apache/pig/test/TestSkewedJoin.java
Modified: pig/branches/branch-0.16/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.16/CHANGES.txt?rev=1777853&r1=1777852&r2=1777853&view=diff
==============================================================================
--- pig/branches/branch-0.16/CHANGES.txt (original)
+++ pig/branches/branch-0.16/CHANGES.txt Sat Jan 7 20:18:04 2017
@@ -32,6 +32,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-4930: Skewed Join Breaks On Empty Sampled Input When Key is From Map
(nkollar via rohini)
+
PIG-3417: Job fails when skewed join is done on tuple key (nkollar via rohini)
PIG-5074: Build broken when hadoopversion=20 in branch 0.16 (szita via daijy)
Modified:
pig/branches/branch-0.16/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1777853&r1=1777852&r2=1777853&view=diff
==============================================================================
---
pig/branches/branch-0.16/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
(original)
+++
pig/branches/branch-0.16/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
Sat Jan 7 20:18:04 2017
@@ -90,7 +90,9 @@ public class PoissonSampleLoader extends
// number of tuples to be skipped
Tuple t = loader.getNext();
if(t == null) {
- return createNumRowTuple(null);
+ // since skipInterval is -1, no previous sample,
+ // and next sample is null -> the data set is empty
+ return null;
}
long availRedMem = (long) ( totalMemory * heapPerc);
// availRedMem = 155084396;
Modified: pig/branches/branch-0.16/test/org/apache/pig/test/TestSkewedJoin.java
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.16/test/org/apache/pig/test/TestSkewedJoin.java?rev=1777853&r1=1777852&r2=1777853&view=diff
==============================================================================
--- pig/branches/branch-0.16/test/org/apache/pig/test/TestSkewedJoin.java
(original)
+++ pig/branches/branch-0.16/test/org/apache/pig/test/TestSkewedJoin.java Sat
Jan 7 20:18:04 2017
@@ -65,6 +65,7 @@ public class TestSkewedJoin {
private static final String INPUT_FILE5 = "SkewedJoinInput5.txt";
private static final String INPUT_FILE6 = "SkewedJoinInput6.txt";
private static final String INPUT_FILE7 = "SkewedJoinInput7.txt";
+ private static final String INPUT_FILE8 = "SkewedJoinInput8.txt";
private static final String TEST_DIR =
Util.getTestDirectory(TestSkewedJoin.class);
private static final String INPUT_DIR = TEST_DIR + Path.SEPARATOR +
"input";
private static final String OUTPUT_DIR = TEST_DIR + Path.SEPARATOR +
"output";
@@ -173,6 +174,11 @@ public class TestSkewedJoin {
}
w7.close();
+ //Empty file
+ PrintWriter w8 = new PrintWriter(new FileWriter(INPUT_DIR + "/" +
INPUT_FILE8));
+ w8.close();
+
+
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1,
INPUT_FILE1);
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE2,
INPUT_FILE2);
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE3,
INPUT_FILE3);
@@ -180,6 +186,7 @@ public class TestSkewedJoin {
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE5,
INPUT_FILE5);
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE6,
INPUT_FILE6);
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE7,
INPUT_FILE7);
+ Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE8,
INPUT_FILE8);
}
private static void deleteFiles() throws IOException {
@@ -187,6 +194,21 @@ public class TestSkewedJoin {
}
@Test
+ public void testSkewedJoinMapLeftEmpty() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE8 + "' as (idM:[]);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE1 + "' as (id, name,
n);");
+ pigServer.registerQuery("C = join A by idM#'id', B by id using
'skewed' PARALLEL 2;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+ int count = 0;
+ while(iter.hasNext()) {
+ count++;
+ iter.next();
+ }
+ assertEquals(0, count);
+ }
+
+
+ @Test
public void testSkewedJoinWithGroup() throws IOException{
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name,
n);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id,
name);");