Author: rohini
Date: Mon Jan 18 19:47:48 2016
New Revision: 1725329
URL: http://svn.apache.org/viewvc?rev=1725329&view=rev
Log:
PIG-4587: Applying isFirstReduceOfKey for Skewed left outer join skips records
(rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
pig/trunk/test/e2e/pig/tests/nightly.conf
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1725329&r1=1725328&r2=1725329&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jan 18 19:47:48 2016
@@ -81,6 +81,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4587: Applying isFirstReduceOfKey for Skewed left outer join skips records
(rohini)
+
PIG-4782: OutOfMemoryError: GC overhead limit exceeded with POPartialAgg
(rohini)
PIG-4737: Check and fix clone implementation for all classes extending
PhysicalOperator (rohini)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1725329&r1=1725328&r2=1725329&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Mon Jan 18 19:47:48 2016
@@ -1945,8 +1945,13 @@ public class MRCompiler extends PhyPlanV
ep.add(prj);
eps.add(ep);
if (!inner[i]) {
- // Add an empty bag for outer join
- CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i),
true, IsFirstReduceOfKey.class.getName());
+ // Add an empty bag for outer join.
+ if (i == 0) {
+ // For right outer, add IsFirstReduceOfKey UDF as well
+ CompilerUtils.addEmptyBagOuterJoin(ep,
op.getSchema(i), true, IsFirstReduceOfKey.class.getName());
+ } else {
+ CompilerUtils.addEmptyBagOuterJoin(ep,
op.getSchema(i), false, IsFirstReduceOfKey.class.getName());
+ }
}
flat.add(true);
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1725329&r1=1725328&r2=1725329&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
Mon Jan 18 19:47:48 2016
@@ -1682,7 +1682,7 @@ public class TezCompiler extends PhyPlan
List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
List<Boolean> flat = new ArrayList<Boolean>();
- boolean containsOuter = false;
+ boolean containsRightOuter = false;
// Add corresponding POProjects
for (int i=0; i < 2; i++) {
ep = new PhysicalPlan();
@@ -1693,9 +1693,13 @@ public class TezCompiler extends PhyPlan
ep.add(prj);
eps.add(ep);
if (!inner[i]) {
- // Add an empty bag for outer join
- CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i),
true, IsFirstReduceOfKeyTez.class.getName());
- containsOuter = true;
+ // Add an empty bag for outer join. For right outer, add
IsFirstReduceOfKeyTez UDF as well
+ if (i == 0) {
+ containsRightOuter = true;
+ CompilerUtils.addEmptyBagOuterJoin(ep,
op.getSchema(i), true, IsFirstReduceOfKeyTez.class.getName());
+ } else {
+ CompilerUtils.addEmptyBagOuterJoin(ep,
op.getSchema(i), false, IsFirstReduceOfKeyTez.class.getName());
+ }
}
flat.add(true);
}
@@ -1714,7 +1718,7 @@ public class TezCompiler extends PhyPlan
POValueOutputTez sampleOut = (POValueOutputTez)
sampleJobPair.first.plan.getLeaves().get(0);
for (int i = 0; i <= 2; i++) {
- if (i != 2 || containsOuter) {
+ if (i != 2 || containsRightOuter) {
// We need to send sample to left relation partitioner
vertex, right relation load vertex,
// and join vertex (IsFirstReduceOfKey in join vertex need
sample file as well)
joinJobs[i].setSampleOperator(sampleJobPair.first);
Modified: pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java?rev=1725329&r1=1725328&r2=1725329&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java Mon Jan 18
19:47:48 2016
@@ -42,43 +42,43 @@ import org.apache.pig.impl.plan.NodeIdGe
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
-/*
+/*
* A class to add util functions that gets used by LogToPhyTranslator and
MRCompiler
- *
+ *
*/
public class CompilerUtils {
public static void addEmptyBagOuterJoin(PhysicalPlan fePlan, Schema
inputSchema,
- boolean skewedJoin, String isFirstReduceOfKeyClassName) throws
PlanException {
+ boolean skewedRightOuterJoin, String isFirstReduceOfKeyClassName)
throws PlanException {
// we currently have POProject[bag] as the only operator in the plan
// If the bag is an empty bag, we should replace
// it with a bag with one tuple with null fields so that when we
flatten
// we do not drop records (flatten will drop records if the bag is left
- // as an empty bag) and actually project nulls for the fields in
+ // as an empty bag) and actually project nulls for the fields in
// the empty bag
-
+
// So we need to get to the following state:
// POProject[Bag]
- // \
- // POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)
- // \ | POProject[Bag]
+ // \
+ // POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)
+ // \ | POProject[Bag]
// \ | /
// POBinCond
- // Further, if it is skewed join, only the first reduce of the key
+ // Further, if it is skewed right outer join, only the first reduce of
the key
// will generate tuple with null fields (See PIG-4377)
- //
+ //
// POProject[key] POProject[Bag]
// \ /
// IsFirstReduceOfKey POUserFunc["IsEmpty()"]
// \ /
// \ /
- // AND Const[Bag](bag with null fields)
- // \ | POProject[Bag]
+ // AND Const[Bag](bag with null fields)
+ // \ | POProject[Bag]
// \ | /
// POBinCond
POProject relationProject = (POProject) fePlan.getRoots().get(0);
try {
-
+
// condition of the bincond
POProject relationProjectForIsEmpty = relationProject.clone();
fePlan.add(relationProjectForIsEmpty);
@@ -92,7 +92,7 @@ public class CompilerUtils {
fePlan.connect(relationProjectForIsEmpty, isEmpty);
ExpressionOperator cond;
- if (skewedJoin) {
+ if (skewedRightOuterJoin) {
POProject projectForKey = new POProject(new
OperatorKey(scope,NodeIdGenerator.getGenerator().getNextNodeId(scope)));
projectForKey.setColumn(0);
projectForKey.setOverloaded(false);
@@ -119,7 +119,7 @@ public class CompilerUtils {
} else {
cond = isEmpty;
}
-
+
// lhs of bincond (const bag with null fields)
ConstantExpression ce = new ConstantExpression(new
OperatorKey(scope,
NodeIdGenerator.getGenerator().getNextNodeId(scope)));
@@ -136,7 +136,7 @@ public class CompilerUtils {
ce.setResultType(DataType.BAG);
//this operator doesn't have any predecessors
fePlan.add(ce);
-
+
//rhs of bincond is the original project
// let's set up the bincond now
POBinCond bincond = new POBinCond(new OperatorKey(scope,
@@ -154,7 +154,7 @@ public class CompilerUtils {
} catch (Exception e) {
throw new PlanException("Error setting up outerjoin", e);
}
-
+
}
}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java?rev=1725329&r1=1725328&r2=1725329&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java Mon Jan 18
19:47:48 2016
@@ -71,14 +71,14 @@ public class TezDAGStats extends JobStat
public static final String PIG_COUNTER_GROUP =
org.apache.pig.PigCounters.class.getName();
public static final String SUCCESS_HEADER = String.format("VertexId
Parallelism TotalTasks"
- + " %1$14s %2$14s %3$14s %4$16s %5$14s %6$16s"
+ + " %1$14s %2$20s %3$14s %4$14s %5$16s %6$14s %7$16s"
+ " Alias\tFeature\tOutputs",
- "InputRecords", "OutputRecords", "FileBytesRead",
"FileBytesWritten", "HdfsBytesRead", "HdfsBytesWritten");
+ "InputRecords", "ReduceInputRecords", "OutputRecords",
"FileBytesRead", "FileBytesWritten", "HdfsBytesRead", "HdfsBytesWritten");
public static final String FAILURE_HEADER = String.format("VertexId State
Parallelism TotalTasks"
- + " %1$14s %2$14s %3$14s %4$16s %5$14s %6$16s"
+ + " %1$14s %2$20s %3$14s %4$14s %5$16s %6$14s %7$16s"
+ " Alias\tFeature\tOutputs",
- "InputRecords", "OutputRecords", "FileBytesRead",
"FileBytesWritten", "HdfsBytesRead", "HdfsBytesWritten");
+ "InputRecords", "ReduceInputRecords", "OutputRecords",
"FileBytesRead", "FileBytesWritten", "HdfsBytesRead", "HdfsBytesWritten");
private Map<String, TezVertexStats> tezVertexStatsMap;
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1725329&r1=1725328&r2=1725329&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
(original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Mon Jan
18 19:47:48 2016
@@ -73,6 +73,7 @@ public class TezVertexStats extends JobS
private int numTasks = 0;
private long numInputRecords = 0;
+ private long numReduceInputRecords = 0;
private long numOutputRecords = 0;
private long fileBytesRead = 0;
private long fileBytesWritten = 0;
@@ -111,6 +112,7 @@ public class TezVertexStats extends JobS
sb.append(String.format("%9s ", parallelism));
sb.append(String.format("%10s ", numTasks));
sb.append(String.format("%14s ", numInputRecords));
+ sb.append(String.format("%20s ", numReduceInputRecords));
sb.append(String.format("%14s ", numOutputRecords));
sb.append(String.format("%14s ", fileBytesRead));
sb.append(String.format("%16s ", fileBytesWritten));
@@ -213,6 +215,19 @@ public class TezVertexStats extends JobS
}
public void addInputStatistics() {
+
+ long inputRecords = -1;
+ Map<String, Long> taskCounters = counters.get(TASK_COUNTER_GROUP);
+ if (taskCounters != null) {
+ if (taskCounters.get(TaskCounter.INPUT_RECORDS_PROCESSED.name())
!= null) {
+ inputRecords =
taskCounters.get(TaskCounter.INPUT_RECORDS_PROCESSED.name());
+ numInputRecords = inputRecords;
+ }
+ if (taskCounters.get(TaskCounter.REDUCE_INPUT_RECORDS.name()) !=
null) {
+ numReduceInputRecords =
taskCounters.get(TaskCounter.REDUCE_INPUT_RECORDS.name());
+ }
+ }
+
if (loads == null) {
return;
}
@@ -233,17 +248,12 @@ public class TezVertexStats extends JobS
if (n != null) records = n;
}
if (records == -1) {
- Map<String, Long> taskCounters =
counters.get(TASK_COUNTER_GROUP);
- if (taskCounters != null
- &&
taskCounters.get(TaskCounter.INPUT_RECORDS_PROCESSED.name()) != null) {
- records =
taskCounters.get(TaskCounter.INPUT_RECORDS_PROCESSED.name());
- }
+ records = inputRecords;
}
if (isSuccessful() && records == -1) {
// Tez removes 0 value counters for efficiency.
records = 0;
}
- numInputRecords = records;
if (counters.get(FS_COUNTER_GROUP) != null &&
counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) {
hdfsBytesRead =
counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ);
@@ -257,6 +267,16 @@ public class TezVertexStats extends JobS
}
public void addOutputStatistics() {
+
+ long outputRecords = -1;
+
+ Map<String, Long> taskCounters = counters.get(TASK_COUNTER_GROUP);
+ if (taskCounters != null
+ && taskCounters.get(TaskCounter.OUTPUT_RECORDS.name()) !=
null) {
+ outputRecords =
taskCounters.get(TaskCounter.OUTPUT_RECORDS.name());
+ numOutputRecords = outputRecords;
+ }
+
if (stores == null) {
return;
}
@@ -279,19 +299,12 @@ public class TezVertexStats extends JobS
if (n != null) records = n;
}
if (records == -1) {
- Map<String, Long> taskCounters =
counters.get(TASK_COUNTER_GROUP);
- if (taskCounters != null
- &&
taskCounters.get(TaskCounter.OUTPUT_RECORDS.name()) != null) {
- records =
taskCounters.get(TaskCounter.OUTPUT_RECORDS.name());
- }
+ records = outputRecords;
}
if (isSuccessful() && records == -1) {
// Tez removes 0 value counters for efficiency.
records = 0;
}
- if (records != -1) {
- numOutputRecords += records;
- }
}
/* TODO: Need to check FILE_BYTES_WRITTEN for local mode */
if (!sto.isMultiStore() && counters.get(FS_COUNTER_GROUP)!= null &&
Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL:
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1725329&r1=1725328&r2=1725329&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Mon Jan 18 19:47:48 2016
@@ -3125,6 +3125,37 @@ c = foreach b generate flatten($0);
store c into ':OUTPATH:';\,
},
+ # left outer join with fixed memory
+ {
+ 'num' => 13,
+ 'java_params' =>
['-Dpig.skewedjoin.reduce.maxtuple=100 -Dpig.skewedjoin.reduce.mem=516947966'],
+ 'pig' => q\a = load
':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration,
contributions);
+b = filter b by name < 'b';
+e = join a by name left outer, b by name using 'skewed' parallel 8;
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load
':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration,
contributions);
+b = filter b by name < 'b';
+e = join a by name left outer, b by name ;
+store e into ':OUTPATH:';\,
+ },
+ # full outer join with fixed memory
+ {
+ 'num' => 14,
+ 'java_params' =>
['-Dpig.skewedjoin.reduce.maxtuple=100 -Dpig.skewedjoin.reduce.mem=516947966'],
+ 'pig' => q\a = load
':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration,
contributions);
+b = filter b by name > 'm';
+e = join a by name full outer, b by name using 'skewed' parallel 8;
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load
':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration,
contributions);
+b = filter b by name > 'm';
+e = join a by name full outer, b by name ;
+store e into ':OUTPATH:';\,
+
+ },
]
},