Author: daijy
Date: Wed May 13 23:33:48 2015
New Revision: 1679298
URL: http://svn.apache.org/r1679298
Log:
PIG-4523: Tez engine should use tez config rather than mr config whenever
possible
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1679298&r1=1679297&r2=1679298&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed May 13 23:33:48 2015
@@ -76,6 +76,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4523: Tez engine should use tez config rather than mr config whenever
possible (daijy)
+
PIG-4452: Embedded SQL using "SQL" instead of "sql" fails with string index
out of range: -1 error (daijy)
PIG-4543: TestEvalPipelineLocal.testRankWithEmptyReduce fail on Hadoop 1
(daijy)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1679298&r1=1679297&r2=1679298&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Wed May 13 23:33:48 2015
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.v2.ut
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
@@ -145,6 +146,7 @@ import org.apache.tez.dag.api.InputIniti
import org.apache.tez.dag.api.OutputCommitterDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
@@ -735,8 +737,18 @@ public class TezDagBuilder extends TezOp
if (tezOp.isUseGraceParallelism()) {
parallel = -1;
}
- Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(),
procDesc, parallel,
- tezOp.isUseMRMapSettings() ?
MRHelpers.getResourceForMRMapper(globalConf) :
MRHelpers.getResourceForMRReducer(globalConf));
+ Resource resource;
+ if (globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB)!=null
&&
+
globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES)!=null) {
+ resource =
Resource.newInstance(globalConf.getInt(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB,
+ TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT),
+
globalConf.getInt(TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES,
+ TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT));
+ } else {
+ // If tez setting is not defined, try MR setting
+ resource = tezOp.isUseMRMapSettings() ?
MRHelpers.getResourceForMRMapper(globalConf) :
MRHelpers.getResourceForMRReducer(globalConf);
+ }
+ Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(),
procDesc, parallel, resource);
Map<String, String> taskEnv = new HashMap<String, String>();
MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, taskEnv,
tezOp.isUseMRMapSettings());
vertex.setTaskEnvironment(taskEnv);
@@ -751,8 +763,15 @@ public class TezDagBuilder extends TezOp
MRApps.setupDistributedCache(globalConf, localResources);
vertex.addTaskLocalFiles(localResources);
- vertex.setTaskLaunchCmdOpts(tezOp.isUseMRMapSettings() ?
MRHelpers.getJavaOptsForMRMapper(globalConf)
- : MRHelpers.getJavaOptsForMRReducer(globalConf));
+ String javaOpts;
+ if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS)!=null) {
+ javaOpts =
globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS);
+ } else {
+ // If tez setting is not defined, try MR setting
+ javaOpts = tezOp.isUseMRMapSettings() ?
MRHelpers.getJavaOptsForMRMapper(globalConf)
+ : MRHelpers.getJavaOptsForMRReducer(globalConf);
+ }
+ vertex.setTaskLaunchCmdOpts(javaOpts);
log.info("For vertex - " + tezOp.getOperatorKey().toString()
+ ": parallelism=" + tezOp.getVertexParallelism()