Author: daijy
Date: Tue Mar 24 16:42:37 2015
New Revision: 1668933

URL: http://svn.apache.org/r1668933
Log:
PIG-4458: Support UDFs in a FOREACH Before a Merge Join

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    
pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java
    pig/trunk/test/org/apache/pig/test/TestMergeJoin.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1668933&r1=1668932&r2=1668933&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Mar 24 16:42:37 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4458: Support UDFs in a FOREACH Before a Merge Join (wattsinabox via daijy)
+
 PIG-4454: Upgrade tez to 0.6.0 (daijy)
 
 PIG-4451: Log partition and predicate filter pushdown information and fix 
optimizer looping (rohini)

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml?rev=1668933&r1=1668932&r2=1668933&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml Tue Mar 24 
16:42:37 2015
@@ -1286,9 +1286,9 @@ C = JOIN A BY a1, B BY b1, C BY c1 USING
 <li>Data must come directly from either a Load or an Order statement.</li>
 <li>There may be filter statements and foreach statements between the sorted 
data source and the join statement. The foreach statement should meet the 
following conditions: 
   <ul>
-    <li>There should be no UDFs in the foreach statement. </li>
     <li>The foreach statement should not change the position of the join keys. 
</li>
-    <li>There should be no transformation on the join keys which will change 
the sort order. </li>
+    <li>There should be no transformation on the join keys which will change 
the sort order.</li>
+    <li>UDFs also have to adhere to the previous condition and should not 
transform the JOIN keys in a way that would change the sort order.</li>
   </ul>
 </li>
 <li>Data must be sorted on join keys in ascending (ASC) order on both 
sides.</li>

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1668933&r1=1668932&r2=1668933&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 Tue Mar 24 16:42:37 2015
@@ -70,6 +70,12 @@ public class POMergeJoin extends Physica
 
     private static final long serialVersionUID = 1L;
 
+    private static final String keyOrderReminder = "Remember that you should " 
+
+        "not change the order of keys before a merge join in a FOREACH or " +
+        "manipulate join keys in a UDF in a way that would change the sort " +
+        "order. UDFs in a FOREACH are allowed as long as they do not change" +
+        "the join key values in a way that would change the sort order.\n";
+
     // flag to indicate when getNext() is called first.
     private boolean firstTime = true;
 
@@ -370,7 +376,9 @@ public class POMergeJoin extends Physica
                     }
                     else{   // At this point right side can't be behind.
                         int errCode = 1102;
-                        String errMsg = "Data is not sorted on right side. 
Last two tuples encountered were: \n"+
+                        String errMsg = "Data is not sorted on right side. \n" 
+
+                            keyOrderReminder +
+                            "Last two tuples encountered were: \n"+
                         curJoiningRightTup+ "\n" + (Tuple)rightInp.result ;
                         throw new ExecException(errMsg,errCode);
                     }    
@@ -398,7 +406,9 @@ public class POMergeJoin extends Physica
             }
             else{   // Current key < Prev Key
                 int errCode = 1102;
-                String errMsg = "Data is not sorted on left side. Last two 
keys encountered were: \n"+
+                String errMsg = "Data is not sorted on left side. \n" +
+                            keyOrderReminder +
+                            "Last two tuples encountered were: \n" +
                 prevLeftKey+ "\n" + curLeftKey ;
                 throw new ExecException(errMsg,errCode);
             }
@@ -468,7 +478,9 @@ public class POMergeJoin extends Physica
             if( prevRightKey != null && rightKey.compareTo(prevRightKey) < 0){
                 // Sanity check.
                 int errCode = 1102;
-                String errMsg = "Data is not sorted on right side. Last two 
keys encountered were: \n"+
+                String errMsg = "Data is not sorted on right side. \n" +
+                            keyOrderReminder +
+                            "Last two tuples encountered were: \n"+
                 prevRightKey+ "\n" + rightKey ;
                 throw new ExecException(errMsg,errCode);
             }

Modified: 
pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java?rev=1668933&r1=1668932&r2=1668933&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java
 Tue Mar 24 16:42:37 2015
@@ -59,8 +59,7 @@ public class MapSideMergeValidator {
     private boolean isAcceptableForEachOp(Operator lo) throws 
LogicalToPhysicalTranslatorException {
         if (lo instanceof LOForEach) {
             OperatorPlan innerPlan = ((LOForEach) lo).getInnerPlan();
-            validateMapSideMerge(innerPlan.getSinks(), innerPlan);
-            return !containsUDFs((LOForEach) lo);
+            return validateMapSideMerge(innerPlan.getSinks(), innerPlan);
         } else {
             return false;
         }
@@ -83,22 +82,4 @@ public class MapSideMergeValidator {
         }
         return true;
     }
-
-    private boolean containsUDFs(LOForEach fo) throws 
LogicalToPhysicalTranslatorException {
-        LogicalPlan logExpPlan = fo.getInnerPlan();
-        UDFFinder udfFinder;
-        try {
-            udfFinder = new UDFFinder(logExpPlan);
-            udfFinder.visit();
-            // TODO (dvryaboy): in the future we could relax this rule by 
tracing what fields
-            // are being passed into the UDF, and only refusing if the UDF is 
working on the
-            // join key. Transforms of other fields should be ok.
-            if (udfFinder.getUDFList().size() != 0) {
-                return true;
-            }
-        } catch (FrontendException e) {
-            throw new LogicalToPhysicalTranslatorException(e);
-        }
-        return false;
-    }
 }

Modified: pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java?rev=1668933&r1=1668932&r2=1668933&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMergeJoin.java Tue Mar 24 16:42:37 
2015
@@ -450,6 +450,33 @@ public class TestMergeJoin {
     }
 
     @Test
+    public void testMergeJoinWithUDF() throws Exception{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as 
(x:int,y:double);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as 
(x:int,y:double);");
+        pigServer.registerQuery("A = FOREACH A GENERATE x, 
org.apache.pig.piggybank.evaluation.math.ABS(y) AS y;");
+
+        DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj 
= BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("C = JOIN A BY x, B BY x USING 'merge';");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbMergeJoin.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("C = JOIN A BY x, B BY x;");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbMergeJoin.size(), dbshj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+    }
+
+    @Test
     public void testMergeJoin3Way() throws IOException{
         try {
             pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, 
name, n);");
@@ -487,24 +514,6 @@ public class TestMergeJoin {
     }
 
     @Test
-    public void testMergeFailWithOrderUDF() throws Exception{
-        String query = "A = LOAD '" + INPUT_FILE + "' as (id, name, n);\n" +
-                "B = LOAD '" + INPUT_FILE + "' as (id, name);\n" +
-                "A = FOREACH A GENERATE LOWER($0) as id;\n" +
-                "C = ORDER B by $0 parallel 5;\n" +
-                "D = join A by id, C by id using 'merge';\n" +
-                "store D into '/dev/null/1';";
-        // verify that this fails parsing sanity checks.
-        try {
-            Util.buildPp(pigServer, query);
-        } catch (Throwable t) {
-            // expected to fail.
-            return;
-        }
-        Assert.fail("Allowed a Merge Join despite a UDF");
-    }       
-
-    @Test
     public void testMergeJoinFailure2() throws IOException{
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, 
n);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (id, 
name);");


Reply via email to