Author: rohini
Date: Thu Jan 21 22:38:45 2016
New Revision: 1726124
URL: http://svn.apache.org/viewvc?rev=1726124&view=rev
Log:
PIG-4587: Applying isFirstReduceOfKey for Skewed left outer join skips records
(rohini)
Modified:
pig/branches/branch-0.15/CHANGES.txt
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java
pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf
Modified: pig/branches/branch-0.15/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.15/CHANGES.txt?rev=1726124&r1=1726123&r2=1726124&view=diff
==============================================================================
--- pig/branches/branch-0.15/CHANGES.txt (original)
+++ pig/branches/branch-0.15/CHANGES.txt Thu Jan 21 22:38:45 2016
@@ -28,6 +28,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-4587: Applying isFirstReduceOfKey for Skewed left outer join skips records
(rohini)
+
PIG-4760: TezDAGStats.convertToHadoopCounters is not used, but impose MR
counter limit (daijy)
PIG-4696: Empty map returned by a streaming_python udf wrongly contains a null
key (cheolsoo)
Modified:
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1726124&r1=1726123&r2=1726124&view=diff
==============================================================================
---
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Thu Jan 21 22:38:45 2016
@@ -1945,8 +1945,13 @@ public class MRCompiler extends PhyPlanV
ep.add(prj);
eps.add(ep);
if (!inner[i]) {
- // Add an empty bag for outer join
- CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i),
true, IsFirstReduceOfKey.class.getName());
+ // Add an empty bag for outer join.
+ if (i == 0) {
+ // For right outer, add IsFirstReduceOfKey UDF as well
+ CompilerUtils.addEmptyBagOuterJoin(ep,
op.getSchema(i), true, IsFirstReduceOfKey.class.getName());
+ } else {
+ CompilerUtils.addEmptyBagOuterJoin(ep,
op.getSchema(i), false, IsFirstReduceOfKey.class.getName());
+ }
}
flat.add(true);
}
Modified:
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1726124&r1=1726123&r2=1726124&view=diff
==============================================================================
---
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
(original)
+++
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
Thu Jan 21 22:38:45 2016
@@ -1639,7 +1639,7 @@ public class TezCompiler extends PhyPlan
List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
List<Boolean> flat = new ArrayList<Boolean>();
- boolean containsOuter = false;
+ boolean containsRightOuter = false;
// Add corresponding POProjects
for (int i=0; i < 2; i++) {
ep = new PhysicalPlan();
@@ -1650,9 +1650,13 @@ public class TezCompiler extends PhyPlan
ep.add(prj);
eps.add(ep);
if (!inner[i]) {
- // Add an empty bag for outer join
- CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i),
true, IsFirstReduceOfKeyTez.class.getName());
- containsOuter = true;
+ // Add an empty bag for outer join. For right outer, add
IsFirstReduceOfKeyTez UDF as well
+ if (i == 0) {
+ containsRightOuter = true;
+ CompilerUtils.addEmptyBagOuterJoin(ep,
op.getSchema(i), true, IsFirstReduceOfKeyTez.class.getName());
+ } else {
+ CompilerUtils.addEmptyBagOuterJoin(ep,
op.getSchema(i), false, IsFirstReduceOfKeyTez.class.getName());
+ }
}
flat.add(true);
}
@@ -1683,7 +1687,7 @@ public class TezCompiler extends PhyPlan
POValueOutputTez sampleOut = (POValueOutputTez)
sampleJobPair.first.plan.getLeaves().get(0);
for (int i = 0; i <= 2; i++) {
- if (i != 2 || containsOuter) {
+ if (i != 2 || containsRightOuter) {
// We need to send sample to left relation partitioner
vertex, right relation load vertex,
// and join vertex (IsFirstReduceOfKey in join vertex need
sample file as well)
joinJobs[i].setSampleOperator(sampleJobPair.first);
Modified:
pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java?rev=1726124&r1=1726123&r2=1726124&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java
(original)
+++ pig/branches/branch-0.15/src/org/apache/pig/impl/util/CompilerUtils.java
Thu Jan 21 22:38:45 2016
@@ -42,43 +42,43 @@ import org.apache.pig.impl.plan.NodeIdGe
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
-/*
+/*
* A class to add util functions that gets used by LogToPhyTranslator and
MRCompiler
- *
+ *
*/
public class CompilerUtils {
public static void addEmptyBagOuterJoin(PhysicalPlan fePlan, Schema
inputSchema,
- boolean skewedJoin, String isFirstReduceOfKeyClassName) throws
PlanException {
+ boolean skewedRightOuterJoin, String isFirstReduceOfKeyClassName)
throws PlanException {
// we currently have POProject[bag] as the only operator in the plan
// If the bag is an empty bag, we should replace
// it with a bag with one tuple with null fields so that when we
flatten
// we do not drop records (flatten will drop records if the bag is left
- // as an empty bag) and actually project nulls for the fields in
+ // as an empty bag) and actually project nulls for the fields in
// the empty bag
-
+
// So we need to get to the following state:
// POProject[Bag]
- // \
- // POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)
- // \ | POProject[Bag]
+ // \
+ // POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)
+ // \ | POProject[Bag]
// \ | /
// POBinCond
- // Further, if it is skewed join, only the first reduce of the key
+ // Further, if it is skewed right outer join, only the first reduce of
the key
// will generate tuple with null fields (See PIG-4377)
- //
+ //
// POProject[key] POProject[Bag]
// \ /
// IsFirstReduceOfKey POUserFunc["IsEmpty()"]
// \ /
// \ /
- // AND Const[Bag](bag with null fields)
- // \ | POProject[Bag]
+ // AND Const[Bag](bag with null fields)
+ // \ | POProject[Bag]
// \ | /
// POBinCond
POProject relationProject = (POProject) fePlan.getRoots().get(0);
try {
-
+
// condition of the bincond
POProject relationProjectForIsEmpty = relationProject.clone();
fePlan.add(relationProjectForIsEmpty);
@@ -92,7 +92,7 @@ public class CompilerUtils {
fePlan.connect(relationProjectForIsEmpty, isEmpty);
ExpressionOperator cond;
- if (skewedJoin) {
+ if (skewedRightOuterJoin) {
POProject projectForKey = new POProject(new
OperatorKey(scope,NodeIdGenerator.getGenerator().getNextNodeId(scope)));
projectForKey.setColumn(0);
projectForKey.setOverloaded(false);
@@ -119,7 +119,7 @@ public class CompilerUtils {
} else {
cond = isEmpty;
}
-
+
// lhs of bincond (const bag with null fields)
ConstantExpression ce = new ConstantExpression(new
OperatorKey(scope,
NodeIdGenerator.getGenerator().getNextNodeId(scope)));
@@ -136,7 +136,7 @@ public class CompilerUtils {
ce.setResultType(DataType.BAG);
//this operator doesn't have any predecessors
fePlan.add(ce);
-
+
//rhs of bincond is the original project
// let's set up the bincond now
POBinCond bincond = new POBinCond(new OperatorKey(scope,
@@ -154,7 +154,7 @@ public class CompilerUtils {
} catch (Exception e) {
throw new PlanException("Error setting up outerjoin", e);
}
-
+
}
}
Modified: pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf?rev=1726124&r1=1726123&r2=1726124&view=diff
==============================================================================
--- pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf Thu Jan 21
22:38:45 2016
@@ -3113,6 +3113,37 @@ c = foreach b generate flatten($0);
store c into ':OUTPATH:';\,
},
+ # left outer join with fixed memory
+ {
+ 'num' => 13,
+ 'java_params' =>
['-Dpig.skewedjoin.reduce.maxtuple=100 -Dpig.skewedjoin.reduce.mem=516947966'],
+ 'pig' => q\a = load
':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration,
contributions);
+b = filter b by name < 'b';
+e = join a by name left outer, b by name using 'skewed' parallel 8;
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load
':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration,
contributions);
+b = filter b by name < 'b';
+e = join a by name left outer, b by name ;
+store e into ':OUTPATH:';\,
+ },
+ # full outer join with fixed memory
+ {
+ 'num' => 14,
+ 'java_params' =>
['-Dpig.skewedjoin.reduce.maxtuple=100 -Dpig.skewedjoin.reduce.mem=516947966'],
+ 'pig' => q\a = load
':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration,
contributions);
+b = filter b by name > 'm';
+e = join a by name full outer, b by name using 'skewed' parallel 8;
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load
':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration,
contributions);
+b = filter b by name > 'm';
+e = join a by name full outer, b by name ;
+store e into ':OUTPATH:';\,
+
+ },
]
},