Author: rohini
Date: Tue Mar 22 10:44:07 2016
New Revision: 1736180
URL: http://svn.apache.org/viewvc?rev=1736180&view=rev
Log:
PIG-4840: Do not turn off UnionOptimizer for unsupported storefuncs in case of
no vertex groups (rohini)
Added:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1736180&r1=1736179&r2=1736180&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Mar 22 10:44:07 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4840: Do not turn off UnionOptimizer for unsupported storefuncs in case of
no vertex groups (rohini)
+
PIG-4843: Turn off combiner in reducer vertex for Tez if bags are in combine
plan (rohini)
PIG-4796: Authenticate with Kerberos using a keytab file (nielsbasjes via
daijy)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1736180&r1=1736179&r2=1736180&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
Tue Mar 22 10:44:07 2016
@@ -18,8 +18,10 @@
package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -68,10 +70,23 @@ public class MultiQueryOptimizerTez exte
return;
}
+ // Using a list instead of set to have consistently ordered plans
List<TezOperator> splittees = new ArrayList<TezOperator>();
Set<TezOperator> mergedNonPackageInputSuccessors = new
HashSet<TezOperator>();
+ // When there is a union successor with unsupported storefunc,
those splittees
+ // can only be merged into the split if all the union members will
be from the split
+ // This is to ensure that there are no vertex groups created with
unsupported storefunc.
+ Map<TezOperator, Set<OperatorKey>> tentativeMergeUnionMembers =
new HashMap<TezOperator, Set<OperatorKey>>();
+
List<TezOperator> successors = getPlan().getSuccessors(tezOp);
+
+ Set<OperatorKey> splitterAndSuccessorKeys = new
HashSet<OperatorKey>();
+ splitterAndSuccessorKeys.add(tezOp.getOperatorKey());
+ for (TezOperator successor : successors) {
+ splitterAndSuccessorKeys.add(successor.getOperatorKey());
+ }
+
for (TezOperator successor : successors) {
List<TezOperator> predecessors = new
ArrayList<TezOperator>(getPlan().getPredecessors(successor));
predecessors.remove(tezOp);
@@ -129,6 +144,7 @@ public class MultiQueryOptimizerTez exte
// Only in case of POPackage(POShuffleTezLoad) multiple inputs
can be handled from a Split
Set<TezOperator> nonPackageInputSuccessors = new
HashSet<TezOperator>();
boolean canMerge = true;
+ Set<TezOperator> successorUnsupportedStoreUnions = new
HashSet<TezOperator>();
mergedSuccessors.addAll(successors);
for (TezOperator splittee : splittees) {
@@ -140,11 +156,21 @@ public class MultiQueryOptimizerTez exte
for (TezOperator succSuccessor :
getPlan().getSuccessors(successor)) {
if (succSuccessor.isUnion()) {
if (!(unionOptimizerOn &&
- UnionOptimizer.isOptimizable(succSuccessor,
- unionSupportedStoreFuncs,
- unionUnsupportedStoreFuncs))) {
+
UnionOptimizer.isOptimizable(succSuccessor))) {
toNotMergeSuccessors.add(succSuccessor);
} else {
+ if (unionOptimizerOn &&
!UnionOptimizer.isOptimizableStoreFunc(succSuccessor,unionSupportedStoreFuncs,unionUnsupportedStoreFuncs))
{
+ // This optimization of using
UnionOptimizer for unsupported storefunc
+ // is only good for one level of split and
does not handle multiple level of split.
+ Set<OperatorKey> unionMembers = new
HashSet<OperatorKey>(succSuccessor.getUnionMembers());
+
unionMembers.removeAll(splitterAndSuccessorKeys);
+ if(unionMembers.isEmpty()) {
+
successorUnsupportedStoreUnions.add(succSuccessor);
+ } else {
+
toNotMergeSuccessors.add(succSuccessor);
+ continue;
+ }
+ }
toMergeSuccessors.add(succSuccessor);
List<TezOperator> unionSuccessors =
getPlan().getSuccessors(succSuccessor);
if (unionSuccessors != null) {
@@ -187,11 +213,47 @@ public class MultiQueryOptimizerTez exte
mergedSuccessors.retainAll(toNotMergeSuccessors);
if (mergedSuccessors.isEmpty()) { // no shared edge after merge
- splittees.add(successor);
mergedNonPackageInputSuccessors.addAll(nonPackageInputSuccessors);
+ if (successorUnsupportedStoreUnions.isEmpty()) {
+ splittees.add(successor);
+ } else {
+ // If all other conditions were satisfied, but it had
a successor union
+ // with unsupported storefunc keep it in the tentative
list
+ for (TezOperator unionOp :
successorUnsupportedStoreUnions) {
+ Set<OperatorKey> tentativeSuccessors =
tentativeMergeUnionMembers.get(unionOp);
+ if (tentativeSuccessors == null) {
+ tentativeSuccessors = new
HashSet<OperatorKey>();
+ tentativeMergeUnionMembers.put(unionOp,
tentativeSuccessors);
+ }
+
tentativeSuccessors.add(successor.getOperatorKey());
+ }
+ }
}
}
+ Set<TezOperator> spliteesToRemove = new HashSet<TezOperator>();
+
+ for (Entry<TezOperator, Set<OperatorKey>> entry :
tentativeMergeUnionMembers.entrySet()) {
+ Set<OperatorKey> unionMembers = new
HashSet<OperatorKey>(entry.getKey().getUnionMembers());
+ if (entry.getValue().containsAll(unionMembers)) {
+ // If all the union members were tentative splittees then
add them
+ for (OperatorKey key : entry.getValue()) {
+ TezOperator splittee = getPlan().getOperator(key);
+ if (!splittees.contains(splittee)) {
+ splittees.add(splittee);
+ }
+ }
+ } else {
+ for (OperatorKey key : entry.getValue()) {
+ spliteesToRemove.add(getPlan().getOperator(key));
+ }
+ }
+ }
+
+ for (TezOperator op : spliteesToRemove) {
+ splittees.remove(op);
+ }
+
if (splittees.size() == 0) {
return;
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1736180&r1=1736179&r2=1736180&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
Tue Mar 22 10:44:07 2016
@@ -104,9 +104,7 @@ public class UnionOptimizer extends TezO
this.unsupportedStoreFuncs = unsupportedStoreFuncs;
}
- public static boolean isOptimizable(TezOperator tezOp,
- List<String> supportedStoreFuncs, List<String>
unsupportedStoreFuncs)
- throws VisitorException {
+ public static boolean isOptimizable(TezOperator tezOp) throws
VisitorException {
if((tezOp.isLimit() || tezOp.isLimitAfterSort()) &&
tezOp.getRequestedParallelism() == 1) {
return false;
}
@@ -116,6 +114,12 @@ public class UnionOptimizer extends TezO
if (tezOp.isRankCounter()) {
return false;
}
+ return true;
+ }
+
+ public static boolean isOptimizableStoreFunc(TezOperator tezOp,
+ List<String> supportedStoreFuncs, List<String>
unsupportedStoreFuncs)
+ throws VisitorException {
if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
List<POStoreTez> stores =
PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
for (POStoreTez store : stores) {
@@ -148,7 +152,7 @@ public class UnionOptimizer extends TezO
return;
}
- if (!isOptimizable(tezOp, supportedStoreFuncs, unsupportedStoreFuncs))
{
+ if (!isOptimizable(tezOp)) {
return;
}
@@ -156,34 +160,40 @@ public class UnionOptimizer extends TezO
String scope = unionOp.getOperatorKey().scope;
PhysicalPlan unionOpPlan = unionOp.plan;
- // TODO: PIG-3856 Handle replicated join and skewed join sample.
- // Replicate join small table/skewed join sample that was broadcast to
union vertex
- // now needs to be broadcast to all the union predecessors. How do we
do that??
- // Wait for shared edge and do it or write multiple times??
- // For now don't optimize except in the case of Split where we need to
write only once
-
Set<OperatorKey> uniqueUnionMembers = new
HashSet<OperatorKey>(unionOp.getUnionMembers());
List<TezOperator> predecessors = new
ArrayList<TezOperator>(tezPlan.getPredecessors(unionOp));
List<TezOperator> successors = tezPlan.getSuccessors(unionOp) == null
? null
: new ArrayList<TezOperator>(tezPlan.getSuccessors(unionOp));
+ if (uniqueUnionMembers.size() != 1) {
+
+ if (!isOptimizableStoreFunc(tezOp, supportedStoreFuncs,
unsupportedStoreFuncs)) {
+ return;
+ }
- if (successors != null && uniqueUnionMembers.size() > 1) {
- for (TezOperator succ : successors) {
- for (TezOperator pred : predecessors) {
- if (succ.inEdges.containsKey(pred.getOperatorKey())) {
- // Stop here, we cannot convert the node into vertex
group
- // Otherwise, we will end up with a parallel edge
between pred
- // and succ
- return;
+ if (successors != null) {
+ for (TezOperator succ : successors) {
+ for (TezOperator pred : predecessors) {
+ if (succ.inEdges.containsKey(pred.getOperatorKey())) {
+ // Stop here, we cannot convert the node into
vertex group
+ // Otherwise, we will end up with a parallel edge
between pred
+ // and succ
+ return;
+ }
}
}
}
+
+ // TODO: PIG-3856 Handle replicated join and skewed join sample.
+ // Replicate join small table/skewed join sample that was
broadcast to union vertex
+ // now needs to be broadcast to all the union predecessors. How do
we do that??
+ // Wait for shared edge and do it or write multiple times??
+ // For now don't optimize except in the case of Split where we
need to write only once
+ if (predecessors.size() > unionOp.getUnionMembers().size()) {
+ return;
+ }
}
- if (predecessors.size() > unionOp.getUnionMembers().size()
- && uniqueUnionMembers.size() != 1) {
- return; // TODO: PIG-3856
- }
+
if (uniqueUnionMembers.size() == 1) {
// We actually don't need VertexGroup in this case. The multiple
// sub-plans of Split can write to same MROutput or the Tez
LogicalOutput
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1736180&r1=1736179&r2=1736180&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
(original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Tue Mar
22 10:44:07 2016
@@ -23,6 +23,7 @@ import static org.apache.pig.tools.pigst
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -35,6 +36,7 @@ import org.apache.pig.PigCounters;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -286,7 +288,14 @@ public class TezVertexStats extends JobS
multiStoreCounters.putAll(msGroup);
}
+ // Split followed by union will have multiple stores writing to same
location
+ Map<String, POStore> uniqueOutputs = new HashMap<String, POStore>();
for (POStore sto : stores) {
+ POStoreTez store = (POStoreTez) sto;
+ uniqueOutputs.put(store.getOutputKey(), store);
+ }
+
+ for (POStore sto : uniqueOutputs.values()) {
if (sto.isTmpStore()) {
continue;
}
Added:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld?rev=1736180&view=auto
==============================================================================
---
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld
(added)
+++
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld
Tue Mar 22 10:44:07 2016
@@ -0,0 +1,143 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-79 -> Tez vertex scope-81,Tez vertex scope-98,Tez
vertex scope-122,Tez vertex scope-124,
+Tez vertex scope-98 -> Tez vertex scope-104,
+Tez vertex scope-122 -> Tez vertex scope-81,
+Tez vertex scope-81 -> Tez vertex scope-104,Tez vertex scope-126,
+Tez vertex scope-104
+Tez vertex scope-124 -> Tez vertex scope-126,
+Tez vertex scope-126
+
+Tez vertex scope-79
+# Plan on vertex
+POValueOutputTez - scope-80 -> [scope-122, scope-124, scope-81,
scope-98]
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[chararray] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-98
+# Plan on vertex
+POValueOutputTez - scope-106 -> [scope-104]
+|
+|---POValueInputTez - scope-99 <- scope-79
+Tez vertex scope-122
+# Plan on vertex
+Local Rearrange[tuple]{int}(false) - scope-67 -> scope-81
+| |
+| Project[int][0] - scope-63
+|
+|---POValueInputTez - scope-123 <- scope-79
+Tez vertex scope-81
+# Plan on vertex
+1-1: Split - scope-133
+| |
+| u3: Store(file:///tmp/output/u3:org.apache.pig.builtin.PigStorage) -
scope-135 -> scope-57
+| |
+| |---e: Filter[bag] - scope-43
+| | |
+| | Not[boolean] - scope-55
+| | |
+| | |---Or[boolean] - scope-54
+| | |
+| | |---Or[boolean] - scope-50
+| | | |
+| | | |---Greater Than[boolean] - scope-46
+| | | | |
+| | | | |---Project[int][0] - scope-44
+| | | | |
+| | | | |---Constant(5) - scope-45
+| | | |
+| | | |---Equal To[boolean] - scope-49
+| | | |
+| | | |---Project[int][0] - scope-47
+| | | |
+| | | |---Constant(7) - scope-48
+| | |
+| | |---Equal To[boolean] - scope-53
+| | |
+| | |---Project[int][0] - scope-51
+| | |
+| | |---Constant(8) - scope-52
+| |
+| d: Split - scope-132
+| | |
+| | u3: Store(file:///tmp/output/u3:org.apache.pig.builtin.PigStorage) -
scope-134 -> scope-57
+| | |
+| | POValueOutputTez - scope-128 -> [scope-126]
+| | |
+| | |---j1: New For Each(false,false)[bag] - scope-74
+| | | |
+| | | Project[int][0] - scope-70
+| | | |
+| | | Project[chararray][1] - scope-72
+| | |
+| | |---j1: FRJoin[tuple] - scope-64 <- scope-122
+| | | |
+| | | Project[int][0] - scope-62
+| | | |
+| | | Project[int][0] - scope-63
+| |
+| |---d: Filter[bag] - scope-36
+| | |
+| | Equal To[boolean] - scope-39
+| | |
+| | |---Project[int][0] - scope-37
+| | |
+| | |---Constant(8) - scope-38
+| |
+| b: Split - scope-130
+| | |
+| | u1: Store(file:///tmp/output/u1:org.apache.pig.builtin.PigStorage) -
scope-136 -> scope-27
+| | |
+| | POValueOutputTez - scope-107 -> [scope-104]
+| |
+| |---b: Filter[bag] - scope-12
+| | |
+| | Greater Than[boolean] - scope-15
+| | |
+| | |---Project[int][0] - scope-13
+| | |
+| | |---Constant(5) - scope-14
+| |
+| c: Split - scope-131
+| | |
+| | u1: Store(file:///tmp/output/u1:org.apache.pig.builtin.PigStorage) -
scope-137 -> scope-27
+| | |
+| | POValueOutputTez - scope-108 -> [scope-104]
+| |
+| |---c: Filter[bag] - scope-19
+| | |
+| | Equal To[boolean] - scope-22
+| | |
+| | |---Project[int][0] - scope-20
+| | |
+| | |---Constant(7) - scope-21
+|
+|---POValueInputTez - scope-82 <- scope-79
+Tez vertex scope-104
+# Plan on vertex
+u2: Store(file:///tmp/output/u2:org.apache.pig.builtin.PigStorage) - scope-35
+|
+|---POShuffledValueInputTez - scope-105 <- [scope-81, scope-98]
+Tez vertex scope-124
+# Plan on vertex
+POValueOutputTez - scope-129 -> [scope-126]
+|
+|---POValueInputTez - scope-125 <- scope-79
+Tez vertex scope-126
+# Plan on vertex
+u4: Store(file:///tmp/output/u4:org.apache.pig.builtin.PigStorage) - scope-78
+|
+|---POShuffledValueInputTez - scope-127 <- [scope-124, scope-81]
Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1736180&r1=1736179&r2=1736180&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Tue Mar 22 10:44:07
2016
@@ -661,6 +661,28 @@ public class TestTezCompiler {
"store c into 'file:///tmp/output' using " +
DummyStoreWithOutputFormat.class.getName() + "();";
// Plan should not have union optimization applied
run(query,
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld");
+
+ resetScope();
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS,
PigStorage.class.getName());
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS,
null);
+ query =
+ "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+ "split a into b if x > 5, c if x == 7, d if x == 8, e
otherwise;" +
+ "u1 = union onschema b, c;" +
+ "store u1 into 'file:///tmp/output/u1';" +
+ //TODO: multiple levels of split not merged
+ "u2 = union onschema a, b, c;" +
+ "store u2 into 'file:///tmp/output/u2';" +
+ "u3 = union onschema d, e;" +
+ "store u3 into 'file:///tmp/output/u3';" +
+ "j1 = join d by x, a by x using 'replicated';" +
+ "j1 = foreach j1 generate d::x as x, d::y as y;" +
+ "u4 = union onschema j1, a;" +
+ "store u4 into 'file:///tmp/output/u4';";
+
+ // Plan should have union optimization applied even for unsupported
storefunc
+ run(query,
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld");
+
// Restore the value
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS,
oldSupported);
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS,
oldUnSupported);