Author: rohini
Date: Tue Oct 6 21:19:35 2015
New Revision: 1707145
URL: http://svn.apache.org/viewvc?rev=1707145&view=rev
Log:
PIG-4691: [Pig on Tez] Support for whitelisting storefuncs for union
optimization (rohini)
Added:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld
Modified:
pig/trunk/CHANGES.txt
pig/trunk/conf/pig.properties
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
pig/trunk/src/pig-default.properties
pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1707145&r1=1707144&r2=1707145&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Oct 6 21:19:35 2015
@@ -49,6 +49,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4691: [Pig on Tez] Support for whitelisting storefuncs for union
optimization (rohini)
+
PIG-3957: Refactor out resetting input key in TezDagBuilder (rohini)
PIG-4688: Limit followed by POPartialAgg can give empty or partial results in
Tez (rohini)
Modified: pig/trunk/conf/pig.properties
URL:
http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1707145&r1=1707144&r2=1707145&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Tue Oct 6 21:19:35 2015
@@ -611,6 +611,8 @@ hcat.bin=/usr/local/hcat/bin/hcat
# output from different vertices into one final output location.
# If a StoreFunc's OutputCommitter does not work with multiple vertices
# writing to same location, then you can disable union optimization just
-# for that StoreFunc. Refer PIG-4649
+# for that StoreFunc. Refer PIG-4649. You can also specify a whitelist of
StoreFuncs
+# that are known to work with multiple vertices writing to same location
instead of a blacklist
#pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer
+#pig.tez.opt.union.supported.storefuncs=
\ No newline at end of file
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1707145&r1=1707144&r2=1707145&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Tue Oct 6 21:19:35 2015
@@ -60,6 +60,13 @@ public class PigConfiguration {
* This key is used to enable or disable union optimization in tez. True
by default
*/
public static final String PIG_TEZ_OPT_UNION = "pig.tez.opt.union";
+ /**
+ * These keys are used to enable or disable tez union optimization for
+ * specific StoreFuncs so that optimization is only applied to StoreFuncs
+ * that do not hard part file names and honor mapreduce.output.basename and
+ * is turned of for those that do not. Refer PIG-4649
+ */
+ public static final String PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS =
"pig.tez.opt.union.supported.storefuncs";
public static final String PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS =
"pig.tez.opt.union.unsupported.storefuncs";
/**
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1707145&r1=1707144&r2=1707145&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
Tue Oct 6 21:19:35 2015
@@ -419,6 +419,12 @@ public class TezLauncher extends Launche
}
boolean isUnionOpt =
conf.getBoolean(PigConfiguration.PIG_TEZ_OPT_UNION, true);
+ List<String> supportedStoreFuncs = null;
+ String unionSupported =
conf.get(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS);
+ if (unionSupported != null && unionSupported.trim().length() > 0) {
+ supportedStoreFuncs = Arrays
+ .asList(StringUtils.split(unionSupported.trim()));
+ }
List<String> unionUnsupportedStoreFuncs = null;
String unionUnSupported =
conf.get(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS);
if (unionUnSupported != null && unionUnSupported.trim().length() > 0) {
@@ -430,7 +436,9 @@ public class TezLauncher extends Launche
if (isMultiQuery) {
// reduces the number of TezOpers in the Tez plan generated
// by multi-query (multi-store) script.
- MultiQueryOptimizerTez mqOptimizer = new
MultiQueryOptimizerTez(tezPlan, isUnionOpt, unionUnsupportedStoreFuncs);
+ MultiQueryOptimizerTez mqOptimizer = new MultiQueryOptimizerTez(
+ tezPlan, isUnionOpt, supportedStoreFuncs,
+ unionUnsupportedStoreFuncs);
mqOptimizer.visit();
}
@@ -443,7 +451,7 @@ public class TezLauncher extends Launche
// Use VertexGroup in Tez
if (isUnionOpt) {
- UnionOptimizer uo = new UnionOptimizer(tezPlan,
unionUnsupportedStoreFuncs);
+ UnionOptimizer uo = new UnionOptimizer(tezPlan,
supportedStoreFuncs, unionUnsupportedStoreFuncs);
uo.visit();
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1707145&r1=1707144&r2=1707145&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
Tue Oct 6 21:19:35 2015
@@ -40,11 +40,15 @@ import org.apache.pig.impl.plan.VisitorE
public class MultiQueryOptimizerTez extends TezOpPlanVisitor {
private boolean unionOptimizerOn;
+ private List<String> unionSupportedStoreFuncs;
private List<String> unionUnsupportedStoreFuncs;
- public MultiQueryOptimizerTez(TezOperPlan plan, boolean unionOptimizerOn,
List<String> unionUnsupportedStoreFuncs) {
+ public MultiQueryOptimizerTez(TezOperPlan plan, boolean unionOptimizerOn,
+ List<String> unionSupportedStoreFuncs,
+ List<String> unionUnsupportedStoreFuncs) {
super(plan, new ReverseDependencyOrderWalker<TezOperator,
TezOperPlan>(plan));
this.unionOptimizerOn = unionOptimizerOn;
+ this.unionSupportedStoreFuncs = unionSupportedStoreFuncs;;
this.unionUnsupportedStoreFuncs = unionUnsupportedStoreFuncs;
}
@@ -128,8 +132,10 @@ public class MultiQueryOptimizerTez exte
if (getPlan().getSuccessors(successor) != null) {
for (TezOperator succSuccessor :
getPlan().getSuccessors(successor)) {
if (succSuccessor.isUnion()) {
- if (!(unionOptimizerOn
- &&
UnionOptimizer.isOptimizable(succSuccessor, unionUnsupportedStoreFuncs))) {
+ if (!(unionOptimizerOn &&
+ UnionOptimizer.isOptimizable(succSuccessor,
+ unionSupportedStoreFuncs,
+ unionUnsupportedStoreFuncs))) {
toMergeSuccessors.add(succSuccessor);
}
} else if (successors.contains(succSuccessor)) {
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1707145&r1=1707144&r2=1707145&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
Tue Oct 6 21:19:35 2015
@@ -26,6 +26,9 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigConfiguration;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -43,7 +46,13 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
+import org.apache.pig.backend.hadoop.hbase.HBaseStorage;
+import org.apache.pig.builtin.AvroStorage;
+import org.apache.pig.builtin.JsonStorage;
+import org.apache.pig.builtin.OrcStorage;
+import org.apache.pig.builtin.PigStorage;
import org.apache.pig.builtin.RoundRobinPartitioner;
+import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
@@ -70,26 +79,57 @@ import org.apache.tez.runtime.library.ou
*/
public class UnionOptimizer extends TezOpPlanVisitor {
+ private static final Log LOG = LogFactory.getLog(UnionOptimizer.class);
private TezOperPlan tezPlan;
+ private static Set<String> builtinSupportedStoreFuncs = new
HashSet<String>();
+ private List<String> supportedStoreFuncs;
private List<String> unsupportedStoreFuncs;
- public UnionOptimizer(TezOperPlan plan, List<String>
unsupportedStoreFuncs) {
+ static {
+ builtinSupportedStoreFuncs.add(PigStorage.class.getName());
+ builtinSupportedStoreFuncs.add(JsonStorage.class.getName());
+ builtinSupportedStoreFuncs.add(OrcStorage.class.getName());
+ builtinSupportedStoreFuncs.add(HBaseStorage.class.getName());
+ builtinSupportedStoreFuncs.add(AvroStorage.class.getName());
+
builtinSupportedStoreFuncs.add("org.apache.pig.piggybank.storage.avro.AvroStorage");
+
builtinSupportedStoreFuncs.add("org.apache.pig.piggybank.storage.avro.CSVExcelStorage");
+ builtinSupportedStoreFuncs.add(Storage.class.getName());
+ }
+
+ public UnionOptimizer(TezOperPlan plan, List<String> supportedStoreFuncs,
List<String> unsupportedStoreFuncs) {
super(plan, new ReverseDependencyOrderWalker<TezOperator,
TezOperPlan>(plan));
tezPlan = plan;
+ this.supportedStoreFuncs = supportedStoreFuncs;
this.unsupportedStoreFuncs = unsupportedStoreFuncs;
}
- public static boolean isOptimizable(TezOperator tezOp, List<String>
unsupportedStoreFuncs)
+ public static boolean isOptimizable(TezOperator tezOp,
+ List<String> supportedStoreFuncs, List<String>
unsupportedStoreFuncs)
throws VisitorException {
if((tezOp.isLimit() || tezOp.isLimitAfterSort()) &&
tezOp.getRequestedParallelism() == 1) {
return false;
}
- if (unsupportedStoreFuncs != null) {
+ if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
List<POStoreTez> stores =
PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
for (POStoreTez store : stores) {
- if
(unsupportedStoreFuncs.contains(store.getStoreFunc().getClass().getName())) {
+ String name = store.getStoreFunc().getClass().getName();
+ if (unsupportedStoreFuncs != null
+ && unsupportedStoreFuncs.contains(name)) {
return false;
}
+ if (supportedStoreFuncs != null
+ && !supportedStoreFuncs.contains(name)) {
+ if (!builtinSupportedStoreFuncs.contains(name)) {
+
LOG.warn(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
+ + " does not contain " + name
+ + " and so disabling union optimization. There
will be some performance degradation. "
+ + "If your storefunc does not hardcode part
file names and can work with multiple vertices writing to the output location,"
+ + " run pig with -D"
+ +
PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
+ + "=<Comma separated list of fully qualified
StoreFunc class names> to enable the optimization. Refer PIG-4691");
+ return false;
+ }
+ }
}
}
return true;
@@ -101,7 +141,7 @@ public class UnionOptimizer extends TezO
return;
}
- if (!isOptimizable(tezOp, unsupportedStoreFuncs)) {
+ if (!isOptimizable(tezOp, supportedStoreFuncs, unsupportedStoreFuncs))
{
return;
}
Modified: pig/trunk/src/pig-default.properties
URL:
http://svn.apache.org/viewvc/pig/trunk/src/pig-default.properties?rev=1707145&r1=1707144&r2=1707145&view=diff
==============================================================================
--- pig/trunk/src/pig-default.properties (original)
+++ pig/trunk/src/pig-default.properties Tue Oct 6 21:19:35 2015
@@ -59,4 +59,4 @@ pig.output.committer.recovery.support=fa
pig.stats.output.size.reader=org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.FileBasedOutputSizeReader
pig.stats.output.size.reader.unsupported=org.apache.pig.builtin.mock.Storage,org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage
-pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer
\ No newline at end of file
+pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage,org.apache.pig.piggybank.storage.MultiStorage
Added:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld?rev=1707145&view=auto
==============================================================================
---
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld
(added)
+++
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld
Tue Oct 6 21:19:35 2015
@@ -0,0 +1,45 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-18 -> Tez vertex scope-20,
+Tez vertex scope-19 -> Tez vertex scope-20,
+Tez vertex scope-20
+
+Tez vertex scope-18
+# Plan on vertex
+POValueOutputTez - scope-22 -> [scope-20]
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[chararray] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-19
+# Plan on vertex
+POValueOutputTez - scope-23 -> [scope-20]
+|
+|---c: New For Each(false,false)[bag] - scope-15
+ | |
+ | Cast[int] - scope-10
+ | |
+ | |---Project[bytearray][1] - scope-9
+ | |
+ | Cast[chararray] - scope-13
+ | |
+ | |---Project[bytearray][0] - scope-12
+ |
+ |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-20
+# Plan on vertex
+c:
Store(file:///tmp/output:org.apache.pig.test.TestMultiQueryBasic$DummyStoreWithOutputFormat)
- scope-17
+|
+|---POShuffledValueInputTez - scope-21 <- [scope-18, scope-19]
Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1707145&r1=1707144&r2=1707145&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Tue Oct 6 21:19:35
2015
@@ -33,9 +33,11 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.TezLocalExecType;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerPrinter;
+import org.apache.pig.builtin.OrcStorage;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.test.TestMultiQueryBasic.DummyStoreWithOutputFormat;
import org.apache.pig.test.Util;
import org.apache.pig.test.utils.TestHelper;
import org.junit.AfterClass;
@@ -514,12 +516,24 @@ public class TestTezCompiler {
"store c into 'file:///tmp/output';";
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
- String oldConfigValue =
getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS);
+ String oldSupported =
getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS);
+ String oldUnSupported =
getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS);
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS,
PigStorage.class.getName());
// Plan should not have union optimization applied
run(query,
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS,
null);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS,
OrcStorage.class.getName());
+ query =
+ "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+ "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+ "c = union onschema a, b;" +
+ "store c into 'file:///tmp/output' using " +
DummyStoreWithOutputFormat.class.getName() + "();";
+ // Plan should not have union optimization applied
+ run(query,
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld");
// Restore the value
- setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS,
oldConfigValue);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS,
oldSupported);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS,
oldUnSupported);
}
@Test