Author: rohini
Date: Thu Jan 21 22:38:45 2016
New Revision: 1726124

URL: http://svn.apache.org/viewvc?rev=1726124&view=rev
Log:
PIG-4587: Applying isFirstReduceOfKey for Skewed left outer join skips records 
(rohini)

Modified:
    pig/branches/branch-0.15/CHANGES.txt
    
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
    pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java
    pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf

Modified: pig/branches/branch-0.15/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/CHANGES.txt?rev=1726124&r1=1726123&r2=1726124&view=diff
==============================================================================
--- pig/branches/branch-0.15/CHANGES.txt (original)
+++ pig/branches/branch-0.15/CHANGES.txt Thu Jan 21 22:38:45 2016
@@ -28,6 +28,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-4587: Applying isFirstReduceOfKey for Skewed left outer join skips records 
(rohini)
+
 PIG-4760: TezDAGStats.convertToHadoopCounters is not used, but impose MR 
counter limit (daijy)
 
 PIG-4696: Empty map returned by a streaming_python udf wrongly contains a null 
key (cheolsoo)

Modified: 
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1726124&r1=1726123&r2=1726124&view=diff
==============================================================================
--- 
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Thu Jan 21 22:38:45 2016
@@ -1945,8 +1945,13 @@ public class MRCompiler extends PhyPlanV
                 ep.add(prj);
                 eps.add(ep);
                 if (!inner[i]) {
-                    // Add an empty bag for outer join
-                    CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), 
true, IsFirstReduceOfKey.class.getName());
+                    // Add an empty bag for outer join.
+                    if (i == 0) {
+                        // For right outer, add IsFirstReduceOfKey UDF as well
+                        CompilerUtils.addEmptyBagOuterJoin(ep, 
op.getSchema(i), true, IsFirstReduceOfKey.class.getName());
+                    } else {
+                        CompilerUtils.addEmptyBagOuterJoin(ep, 
op.getSchema(i), false, IsFirstReduceOfKey.class.getName());
+                    }
                 }
                 flat.add(true);
             }

Modified: 
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1726124&r1=1726123&r2=1726124&view=diff
==============================================================================
--- 
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 (original)
+++ 
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 Thu Jan 21 22:38:45 2016
@@ -1639,7 +1639,7 @@ public class TezCompiler extends PhyPlan
             List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
             List<Boolean> flat = new ArrayList<Boolean>();
 
-            boolean containsOuter = false;
+            boolean containsRightOuter = false;
             // Add corresponding POProjects
             for (int i=0; i < 2; i++) {
                 ep = new PhysicalPlan();
@@ -1650,9 +1650,13 @@ public class TezCompiler extends PhyPlan
                 ep.add(prj);
                 eps.add(ep);
                 if (!inner[i]) {
-                    // Add an empty bag for outer join
-                    CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), 
true, IsFirstReduceOfKeyTez.class.getName());
-                    containsOuter = true;
+                    // Add an empty bag for outer join. For right outer, add 
IsFirstReduceOfKeyTez UDF as well
+                    if (i == 0) {
+                        containsRightOuter = true;
+                        CompilerUtils.addEmptyBagOuterJoin(ep, 
op.getSchema(i), true, IsFirstReduceOfKeyTez.class.getName());
+                    } else {
+                        CompilerUtils.addEmptyBagOuterJoin(ep, 
op.getSchema(i), false, IsFirstReduceOfKeyTez.class.getName());
+                    }
                 }
                 flat.add(true);
             }
@@ -1683,7 +1687,7 @@ public class TezCompiler extends PhyPlan
 
             POValueOutputTez sampleOut = (POValueOutputTez) 
sampleJobPair.first.plan.getLeaves().get(0);
             for (int i = 0; i <= 2; i++) {
-                if (i != 2 || containsOuter) {
+                if (i != 2 || containsRightOuter) {
                     // We need to send sample to left relation partitioner 
vertex, right relation load vertex,
                     // and join vertex (IsFirstReduceOfKey in join vertex need 
sample file as well)
                     joinJobs[i].setSampleOperator(sampleJobPair.first);

Modified: 
pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java?rev=1726124&r1=1726123&r2=1726124&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java 
(original)
+++ pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java 
Thu Jan 21 22:38:45 2016
@@ -42,43 +42,43 @@ import org.apache.pig.impl.plan.NodeIdGe
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 
-/* 
+/*
  * A class to add util functions that gets used by LogToPhyTranslator and 
MRCompiler
- * 
+ *
  */
 public class CompilerUtils {
 
     public static void addEmptyBagOuterJoin(PhysicalPlan fePlan, Schema 
inputSchema,
-            boolean skewedJoin, String isFirstReduceOfKeyClassName) throws 
PlanException {
+            boolean skewedRightOuterJoin, String isFirstReduceOfKeyClassName) 
throws PlanException {
         // we currently have POProject[bag] as the only operator in the plan
         // If the bag is an empty bag, we should replace
         // it with a bag with one tuple with null fields so that when we 
flatten
         // we do not drop records (flatten will drop records if the bag is left
-        // as an empty bag) and actually project nulls for the fields in 
+        // as an empty bag) and actually project nulls for the fields in
         // the empty bag
-        
+
         // So we need to get to the following state:
         // POProject[Bag]
-        //         \     
-        //    POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)   
-        //                        \      |    POProject[Bag]             
+        //         \
+        //    POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)
+        //                        \      |    POProject[Bag]
         //                         \     |    /
         //                          POBinCond
-        // Further, if it is skewed join, only the first reduce of the key
+        // Further, if it is skewed right outer join, only the first reduce of 
the key
         // will generate tuple with null fields (See PIG-4377)
-        // 
+        //
         // POProject[key]              POProject[Bag]
         //         \                      /
         //      IsFirstReduceOfKey  POUserFunc["IsEmpty()"]
         //                   \        /
         //                    \      /
-        //                       AND  Const[Bag](bag with null fields)   
-        //                        \      |    POProject[Bag]             
+        //                       AND  Const[Bag](bag with null fields)
+        //                        \      |    POProject[Bag]
         //                         \     |    /
         //                          POBinCond
         POProject relationProject = (POProject) fePlan.getRoots().get(0);
         try {
-            
+
             // condition of the bincond
             POProject relationProjectForIsEmpty = relationProject.clone();
             fePlan.add(relationProjectForIsEmpty);
@@ -92,7 +92,7 @@ public class CompilerUtils {
             fePlan.connect(relationProjectForIsEmpty, isEmpty);
 
             ExpressionOperator cond;
-            if (skewedJoin) {
+            if (skewedRightOuterJoin) {
                 POProject projectForKey = new POProject(new 
OperatorKey(scope,NodeIdGenerator.getGenerator().getNextNodeId(scope)));
                 projectForKey.setColumn(0);
                 projectForKey.setOverloaded(false);
@@ -119,7 +119,7 @@ public class CompilerUtils {
             } else {
                 cond = isEmpty;
             }
-            
+
             // lhs of bincond (const bag with null fields)
             ConstantExpression ce = new ConstantExpression(new 
OperatorKey(scope,
                     NodeIdGenerator.getGenerator().getNextNodeId(scope)));
@@ -136,7 +136,7 @@ public class CompilerUtils {
             ce.setResultType(DataType.BAG);
             //this operator doesn't have any predecessors
             fePlan.add(ce);
-            
+
             //rhs of bincond is the original project
             // let's set up the bincond now
             POBinCond bincond = new POBinCond(new OperatorKey(scope,
@@ -154,7 +154,7 @@ public class CompilerUtils {
         } catch (Exception e) {
             throw new PlanException("Error setting up outerjoin", e);
         }
-       
+
     }
 
 }

Modified: pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf?rev=1726124&r1=1726123&r2=1726124&view=diff
==============================================================================
--- pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf Thu Jan 21 
22:38:45 2016
@@ -3113,6 +3113,37 @@ c = foreach b generate flatten($0);
 store c into ':OUTPATH:';\,
 
                         },
+                        # left outer join with fixed memory
+                        {
+                        'num' => 13,
+                        'java_params' => 
['-Dpig.skewedjoin.reduce.maxtuple=100 -Dpig.skewedjoin.reduce.mem=516947966'],
+                        'pig' => q\a = load 
':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, 
contributions);
+b = filter b by name < 'b';
+e = join a by name left outer, b by name using 'skewed' parallel 8;
+store e into ':OUTPATH:';\,
+                        'verify_pig_script' => q\a = load 
':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, 
contributions);
+b = filter b by name < 'b';
+e = join a by name left outer, b by name ;
+store e into ':OUTPATH:';\,
+                        },
+                        # full outer join with fixed memory
+                        {
+                        'num' => 14,
+                        'java_params' => 
['-Dpig.skewedjoin.reduce.maxtuple=100 -Dpig.skewedjoin.reduce.mem=516947966'],
+                        'pig' => q\a = load 
':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, 
contributions);
+b = filter b by name > 'm';
+e = join a by name full outer, b by name using 'skewed' parallel 8;
+store e into ':OUTPATH:';\,
+                        'verify_pig_script' => q\a = load 
':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, 
contributions);
+b = filter b by name > 'm';
+e = join a by name full outer, b by name ;
+store e into ':OUTPATH:';\,
+
+                        },
                 ]
 
             },


Reply via email to