Author: rohini
Date: Sat Jan  7 20:18:04 2017
New Revision: 1777853

URL: http://svn.apache.org/viewvc?rev=1777853&view=rev
Log:
PIG-4930: Skewed Join Breaks On Empty Sampled Input When Key is From Map 
(nkollar via rohini)

Modified:
    pig/branches/branch-0.16/CHANGES.txt
    
pig/branches/branch-0.16/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
    pig/branches/branch-0.16/test/org/apache/pig/test/TestSkewedJoin.java

Modified: pig/branches/branch-0.16/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.16/CHANGES.txt?rev=1777853&r1=1777852&r2=1777853&view=diff
==============================================================================
--- pig/branches/branch-0.16/CHANGES.txt (original)
+++ pig/branches/branch-0.16/CHANGES.txt Sat Jan  7 20:18:04 2017
@@ -32,6 +32,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-4930: Skewed Join Breaks On Empty Sampled Input When Key is From Map 
(nkollar via rohini)
+
 PIG-3417: Job fails when skewed join is done on tuple key (nkollar via rohini)
 
 PIG-5074: Build broken when hadoopversion=20 in branch 0.16 (szita via daijy)

Modified: 
pig/branches/branch-0.16/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1777853&r1=1777852&r2=1777853&view=diff
==============================================================================
--- 
pig/branches/branch-0.16/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
 (original)
+++ 
pig/branches/branch-0.16/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
 Sat Jan  7 20:18:04 2017
@@ -90,7 +90,9 @@ public class PoissonSampleLoader extends
             // number of tuples to be skipped
             Tuple t = loader.getNext();
             if(t == null) {
-                return createNumRowTuple(null);
+                // since skipInterval is -1, no previous sample,
+                // and next sample is null -> the data set is empty
+                return null;
             }
             long availRedMem = (long) ( totalMemory * heapPerc);
             // availRedMem = 155084396;

Modified: pig/branches/branch-0.16/test/org/apache/pig/test/TestSkewedJoin.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.16/test/org/apache/pig/test/TestSkewedJoin.java?rev=1777853&r1=1777852&r2=1777853&view=diff
==============================================================================
--- pig/branches/branch-0.16/test/org/apache/pig/test/TestSkewedJoin.java 
(original)
+++ pig/branches/branch-0.16/test/org/apache/pig/test/TestSkewedJoin.java Sat 
Jan  7 20:18:04 2017
@@ -65,6 +65,7 @@ public class TestSkewedJoin {
     private static final String INPUT_FILE5 = "SkewedJoinInput5.txt";
     private static final String INPUT_FILE6 = "SkewedJoinInput6.txt";
     private static final String INPUT_FILE7 = "SkewedJoinInput7.txt";
+    private static final String INPUT_FILE8 = "SkewedJoinInput8.txt";
     private static final String TEST_DIR = 
Util.getTestDirectory(TestSkewedJoin.class);
     private static final String INPUT_DIR = TEST_DIR + Path.SEPARATOR + 
"input";
     private static final String OUTPUT_DIR = TEST_DIR + Path.SEPARATOR + 
"output";
@@ -173,6 +174,11 @@ public class TestSkewedJoin {
         }
         w7.close();
 
+        //Empty file
+        PrintWriter w8 = new PrintWriter(new FileWriter(INPUT_DIR + "/" + 
INPUT_FILE8));
+        w8.close();
+
+
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1, 
INPUT_FILE1);
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE2, 
INPUT_FILE2);
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE3, 
INPUT_FILE3);
@@ -180,6 +186,7 @@ public class TestSkewedJoin {
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE5, 
INPUT_FILE5);
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE6, 
INPUT_FILE6);
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE7, 
INPUT_FILE7);
+        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE8, 
INPUT_FILE8);
     }
 
     private static void deleteFiles() throws IOException {
@@ -187,6 +194,21 @@ public class TestSkewedJoin {
     }
 
     @Test
+    public void testSkewedJoinMapLeftEmpty() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE8 + "' as (idM:[]);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE1 + "' as (id, name, 
n);");
+        pigServer.registerQuery("C = join A by idM#'id', B by id using 
'skewed' PARALLEL 2;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        int count = 0;
+        while(iter.hasNext()) {
+            count++;
+            iter.next();
+        }
+        assertEquals(0, count);
+    }
+
+
+    @Test
     public void testSkewedJoinWithGroup() throws IOException{
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, 
n);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, 
name);");


Reply via email to