Author: rohini
Date: Wed Jan 11 15:06:05 2017
New Revision: 1778311
URL: http://svn.apache.org/viewvc?rev=1778311&view=rev
Log:
PIG-5043: Slowstart not applied in Tez with PARALLEL clause (rohini)
Modified:
pig/branches/branch-0.16/CHANGES.txt
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobControlCompiler.java
Modified: pig/branches/branch-0.16/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.16/CHANGES.txt?rev=1778311&r1=1778310&r2=1778311&view=diff
==============================================================================
--- pig/branches/branch-0.16/CHANGES.txt (original)
+++ pig/branches/branch-0.16/CHANGES.txt Wed Jan 11 15:06:05 2017
@@ -32,6 +32,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-5043: Slowstart not applied in Tez with PARALLEL clause (rohini)
+
PIG-4930: Skewed Join Breaks On Empty Sampled Input When Key is From Map
(nkollar via rohini)
PIG-3417: Job fails when skewed join is done on tuple key (nkollar via rohini)
Modified:
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1778311&r1=1778310&r2=1778311&view=diff
==============================================================================
---
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
(original)
+++
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Wed Jan 11 15:06:05 2017
@@ -23,9 +23,11 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
@@ -174,6 +176,7 @@ public class TezDagBuilder extends TezOp
private PigContext pc;
private Configuration globalConf;
private Configuration pigContextConf;
+ private Configuration shuffleVertexManagerBaseConf;
private FileSystem fs;
private long intermediateTaskInputSize;
private Set<String> inputSplitInDiskVertices;
@@ -217,6 +220,16 @@ public class TezDagBuilder extends TezOp
this.pigContextConf =
ConfigurationUtil.toConfiguration(pc.getProperties(), false);
MRToTezHelper.processMRSettings(pigContextConf, globalConf);
+ shuffleVertexManagerBaseConf = new Configuration(false);
+ // Only copy tez.shuffle-vertex-manager config to keep payload size
small
+ Iterator<Entry<String, String>> iter = pigContextConf.iterator();
+ while (iter.hasNext()) {
+ Entry<String, String> entry = iter.next();
+ if (entry.getKey().startsWith("tez.shuffle-vertex-manager")) {
+ shuffleVertexManagerBaseConf.set(entry.getKey(),
entry.getValue());
+ }
+ }
+
// Add credentials from binary token file and get tokens for namenodes
// specified in mapreduce.job.hdfs-servers
SecurityHelper.populateTokenCache(globalConf, dag.getCredentials());
@@ -265,7 +278,7 @@ public class TezDagBuilder extends TezOp
if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) == null)
{
// If tez setting is not defined
MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, mapTaskEnv, true);
- MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv,
true);
+ MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv,
false);
}
if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) != null)
{
@@ -778,6 +791,21 @@ public class TezDagBuilder extends TezOp
String vmPluginName = null;
Configuration vmPluginConf = null;
+ boolean containScatterGather = false;
+ boolean containCustomPartitioner = false;
+ for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
+ if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) {
+ containScatterGather = true;
+ }
+ if (edge.partitionerClass != null) {
+ containCustomPartitioner = true;
+ }
+ }
+
+ if(containScatterGather) {
+ vmPluginName = ShuffleVertexManager.class.getName();
+ vmPluginConf = new Configuration(shuffleVertexManagerBaseConf);
+ }
// Set the right VertexManagerPlugin
if (tezOp.getEstimatedParallelism() != -1) {
@@ -792,31 +820,8 @@ public class TezDagBuilder extends TezOp
log.info("Set VertexManagerPlugin to
PartitionerDefinedParallelismVertexManager for vertex " +
tezOp.getOperatorKey().toString());
}
} else {
- boolean containScatterGather = false;
- boolean containCustomPartitioner = false;
- for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
- if (edge.dataMovementType ==
DataMovementType.SCATTER_GATHER) {
- containScatterGather = true;
- }
- if (edge.partitionerClass!=null) {
- containCustomPartitioner = true;
- }
- }
if (containScatterGather && !containCustomPartitioner) {
- vmPluginConf = (vmPluginConf == null) ? new
Configuration(pigContextConf) : vmPluginConf;
- // Use auto-parallelism feature of ShuffleVertexManager to
dynamically
- // reduce the parallelism of the vertex
- if
(payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
- &&
!TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()
- && tezOp.getCrossKeys() == null) {
- vmPluginName =
PigGraceShuffleVertexManager.class.getName();
- tezOp.setUseGraceParallelism(true);
- vmPluginConf.set("pig.tez.plan",
getSerializedTezPlan());
- vmPluginConf.set(PigImplConstants.PIG_CONTEXT,
serializedPigContext);
- } else {
- vmPluginName = ShuffleVertexManager.class.getName();
- }
-
vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
true);
+
// For Intermediate reduce, set the bytes per reducer to
be block size.
long bytesPerReducer = intermediateTaskInputSize;
// If there are store statements, use
BYTES_PER_REDUCER_PARAM configured by user.
@@ -825,8 +830,8 @@ public class TezDagBuilder extends TezOp
// In Tez, numReducers=(map output size/bytesPerReducer)
we need lower values to avoid skews in reduce
// as map input sizes are mostly always high compared to
map output.
if (stores.size() > 0) {
- if
(vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
- bytesPerReducer = vmPluginConf.getLong(
+ if
(pigContextConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null)
{
+ bytesPerReducer = pigContextConf.getLong(
InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
} else if (tezOp.isGroupBy()) {
@@ -835,6 +840,20 @@ public class TezDagBuilder extends TezOp
bytesPerReducer =
SHUFFLE_BYTES_PER_REDUCER_DEFAULT;
}
}
+
+ // Use auto-parallelism feature of ShuffleVertexManager to
dynamically
+ // reduce the parallelism of the vertex. Use
PigGraceShuffleVertexManager
+ // instead of ShuffleVertexManager if
pig.tez.grace.parallelism is turned on
+ if
(payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
+ &&
!TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()
+ && tezOp.getCrossKeys() == null) {
+ vmPluginName =
PigGraceShuffleVertexManager.class.getName();
+ tezOp.setUseGraceParallelism(true);
+ vmPluginConf.set("pig.tez.plan",
getSerializedTezPlan());
+ vmPluginConf.set(PigImplConstants.PIG_CONTEXT,
serializedPigContext);
+
vmPluginConf.setLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
bytesPerReducer);
+ }
+
vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
true);
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
bytesPerReducer);
log.info("Set auto parallelism for vertex " +
tezOp.getOperatorKey().toString());
}
Modified:
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1778311&r1=1778310&r2=1778311&view=diff
==============================================================================
---
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
(original)
+++
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
Wed Jan 11 15:06:05 2017
@@ -50,6 +50,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.classification.InterfaceAudience;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk;
@@ -102,7 +103,6 @@ public class MRToTezHelper {
mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE,
TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL,
TezConfiguration.TEZ_TASK_LOG_LEVEL);
mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit",
"tez.am.vertex.max-task-concurrency");
-
mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.map.limit",
"tez.am.vertex.max-task-concurrency");
mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT,
"tez.am.progress.stuck.interval-ms");
}
@@ -259,6 +259,14 @@ public class MRToTezHelper {
JobControlCompiler.configureCompression(tezConf);
convertMRToTezConf(tezConf, mrConf,
DeprecatedKeys.getMRToTezRuntimeParamMap());
removeUnwantedSettings(tezConf, false);
+
+ // ShuffleVertexManager Plugin settings
+ // DeprecatedKeys.getMRToTezRuntimeParamMap() only translates min and
not max
+ String slowStartFraction =
mrConf.get(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
+ if (slowStartFraction != null) {
+
tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
slowStartFraction);
+
tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
slowStartFraction);
+ }
}
/**
Modified:
pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobControlCompiler.java
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1778311&r1=1778310&r2=1778311&view=diff
==============================================================================
---
pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobControlCompiler.java
(original)
+++
pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobControlCompiler.java
Wed Jan 11 15:06:05 2017
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTru
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
@@ -36,12 +37,14 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.TezJobCompiler;
import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
@@ -50,6 +53,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
@@ -59,8 +63,11 @@ import org.apache.pig.test.junit.Ordered
import org.apache.pig.test.junit.OrderedJUnit4Runner.TestOrder;
import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.pig.tools.pigstats.tez.TezScriptState;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -79,7 +86,8 @@ import org.junit.runner.RunWith;
"testTezParallelismEstimatorFilterFlatten",
"testTezParallelismEstimatorHashJoin",
"testTezParallelismEstimatorSplitBranch",
- "testTezParallelismDefaultParallelism"
+ "testTezParallelismDefaultParallelism",
+ "testShuffleVertexManagerConfig"
})
public class TestTezJobControlCompiler {
private static PigContext pc;
@@ -292,6 +300,72 @@ public class TestTezJobControlCompiler {
TezOperator leafOper = compiledPlan.first.getLeaves().get(0);
Vertex leafVertex =
compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
assertEquals(leafVertex.getParallelism(), 5);
+ pc.defaultParallel = -1;
+ }
+
+ @Test
+ public void testShuffleVertexManagerConfig() throws Exception{
+
pc.getProperties().setProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
"0.3");
+
pc.getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
"500");
+
+ try {
+
+ String query = "a = load '10' using " +
ArbitarySplitsLoader.class.getName()
+ + "() as (name:chararray, age:int, gpa:double);"
+ + "b = limit a 5;"
+ + "c = group b by name;"
+ + "store c into 'output';";
+
+ VertexManagerPluginDescriptor vmPlugin =
getLeafVertexVMPlugin(query);
+ Configuration vmPluginConf =
TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload());
+
+ // Case of grace auto parallelism (PigGraceShuffleVertexManager)
+ assertEquals(PigGraceShuffleVertexManager.class.getName(),
vmPlugin.getClassName());
+ // min and max src fraction, auto parallel, desired size,
bytes.per.reducer, pig.tez.plan and pigcontext
+ assertEquals(7, vmPluginConf.size());
+ assertEquals("0.3",
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION));
+ assertEquals("0.3",
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION));
+ assertEquals("true",
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL));
+ assertEquals("500",
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE));
+ assertEquals("500",
vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM));
+
+ // Case of auto parallelism (ShuffleVertexManager)
+
pc.getProperties().setProperty(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM,
"false");
+ vmPlugin = getLeafVertexVMPlugin(query);
+ vmPluginConf =
TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload());
+ assertEquals(ShuffleVertexManager.class.getName(),
vmPlugin.getClassName());
+ // min and max src fraction, auto parallel, desired size
+ assertEquals(4, vmPluginConf.size());
+ assertEquals("0.3",
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION));
+ assertEquals("0.3",
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION));
+ assertEquals("true",
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL));
+ assertEquals("500",
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE));
+
+ // Case of default parallel or PARALLEL (ShuffleVertexManager)
+ pc.defaultParallel = 2;
+ vmPlugin = getLeafVertexVMPlugin(query);
+ vmPluginConf =
TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload());
+ assertEquals(ShuffleVertexManager.class.getName(),
vmPlugin.getClassName());
+ // min and max src fraction
+ assertEquals(2, vmPluginConf.size());
+ assertEquals("0.3",
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION));
+ assertEquals("0.3",
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION));
+ } finally {
+
pc.getProperties().remove(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
+
pc.getProperties().remove(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM);
+
pc.getProperties().remove(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM);
+ pc.defaultParallel = -1;
+ }
+ }
+
+ private VertexManagerPluginDescriptor getLeafVertexVMPlugin(String query)
throws Exception {
+ Pair<TezOperPlan, DAG> compiledPlan = compile(query);
+ TezOperator leafOper = compiledPlan.first.getLeaves().get(0);
+ Vertex leafVertex =
compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
+ Field vmPluginField =
Vertex.class.getDeclaredField("vertexManagerPlugin");
+ vmPluginField.setAccessible(true);
+ VertexManagerPluginDescriptor vmPlugin =
(VertexManagerPluginDescriptor) vmPluginField.get(leafVertex);
+ return vmPlugin;
}
private Pair<TezOperPlan, DAG> compile(String query) throws Exception {