Author: rohini
Date: Wed Nov 29 19:38:32 2017
New Revision: 1816646

URL: http://svn.apache.org/viewvc?rev=1816646&view=rev
Log:
PIG-5310: MergeJoin throwing NullPointer Exception (satishsaley via rohini)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    pig/trunk/test/org/apache/pig/test/TestMergeJoin.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1816646&r1=1816645&r2=1816646&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Nov 29 19:38:32 2017
@@ -60,6 +60,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5310: MergeJoin throwing NullPointer Exception (satishsaley via rohini)
+
 PIG-5314: Abort method is not implemented in PigProcessor (satishsaley via 
rohini)
 
 PIG-5307: NPE in TezOperDependencyParallelismEstimator (rohini)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1816646&r1=1816645&r2=1816646&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 Wed Nov 29 19:38:32 2017
@@ -523,13 +523,12 @@ public class POMergeJoin extends Physica
                 throw new ExecException(errMsg,errCode);
             }
 
-            int cmpval = rightKey.compareTo(prevLeftKey);
-            if(cmpval < 0) {    // still behind the left side, do nothing, 
fetch next right tuple.
+            if (prevLeftKey != null && rightKey.compareTo(prevLeftKey) < 0) {  
  // still behind the left side, do nothing, fetch next right tuple.
                 slidingToNextRecord = true;
                 continue;
             }
 
-            else if (cmpval == 0){  // Found matching tuple. Time to do join.
+            else if (prevLeftKey != null && rightKey.compareTo(prevLeftKey) == 
0){  // Found matching tuple. Time to do join.
 
                 curJoiningRightTup = (Tuple)rightInp.result;
                 counter = leftTuples.size();

Modified: pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java?rev=1816646&r1=1816645&r2=1816646&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMergeJoin.java Wed Nov 29 19:38:32 
2017
@@ -714,7 +714,37 @@ public class TestMergeJoin {
         Assert.assertEquals(dbMergeJoin.size(), dbshj.size());
         Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj)); 
       
     }
-    
+
+    @Test
+    public void testMergeJoinLeftInputSmallerThanRight() throws IOException {
+        String rightInput = "right_input.txt";
+        Util.createInputFile(cluster, rightInput, new String[] { "3", "4" });
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE2 + "' as 
(key:int);");
+        pigServer.registerQuery("B = LOAD '" + rightInput + "' as (key:int);");
+        DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(),
+                dbshj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("C = join A by key, B by key using 
'merge';");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while (iter.hasNext()) {
+                dbMergeJoin.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("C = join A by key, B by key;");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while (iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbMergeJoin.size(), dbshj.size());
+        Assert.assertEquals(0, dbMergeJoin.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+        Util.deleteFile(cluster, rightInput);
+    }
+
     /**
      * A dummy loader which implements {@link IndexableLoadFunc} to test
      * that expressions are not allowed as merge join keys when the right 
input's


Reply via email to