Author: daijy
Date: Sat Apr 18 03:30:19 2015
New Revision: 1674429
URL: http://svn.apache.org/r1674429
Log:
PIG-4434: Improve auto-parallelism for tez
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java
pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/conf/pig.properties
pig/trunk/ivy/libraries.properties
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
pig/trunk/src/org/apache/pig/impl/plan/OperatorKey.java
pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestImplicitSplitOnTuple.java
pig/trunk/test/org/apache/pig/test/Util.java
pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java
pig/trunk/test/org/apache/pig/tez/TestTezLauncher.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Apr 18 03:30:19 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4434: Improve auto-parallelism for tez (daijy)
+
PIG-4495: Better multi-query planning in case of multiple edges (rohini)
PIG-3294: Allow Pig use Hive UDFs (daijy)
Modified: pig/trunk/conf/pig.properties
URL:
http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Sat Apr 18 03:30:19 2015
@@ -566,10 +566,11 @@ hcat.bin=/usr/local/hcat/bin/hcat
#
# opt.fetch=true
-# Enable auto parallelism in tez. This should be used by default unless
-# you encounter some bug in automatic parallelism. If set to false, use 1 as
-# default parallelism
+# Enable auto/grace parallelism in tez. These should be used by default unless
+# you encounter some bug in automatic parallelism. If pig.tez.auto.parallelism
+# to false, use 1 as default parallelism
pig.tez.auto.parallelism=true
+pig.tez.grace.parallelism=true
###########################################################################
#
Modified: pig/trunk/ivy/libraries.properties
URL:
http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Sat Apr 18 03:30:19 2015
@@ -92,6 +92,6 @@ mockito.version=1.8.4
jansi.version=1.9
asm.version=3.3.1
snappy-java.version=1.1.0.1
-tez.version=0.6.0
+tez.version=0.7.0-SNAPSHOT
parquet-pig-bundle.version=1.2.3
snappy.version=0.2
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Sat Apr 18 03:30:19 2015
@@ -117,6 +117,10 @@ public class PigConfiguration {
* This key is used to configure auto parallelism in tez. Default is true.
*/
public static final String PIG_TEZ_AUTO_PARALLELISM =
"pig.tez.auto.parallelism";
+ /**
+ * This key is used to configure grace parallelism in tez. Default is true.
+ */
+ public static final String PIG_TEZ_GRACE_PARALLELISM =
"pig.tez.grace.parallelism";
/**
* This key is used to configure compression for the pig input splits which
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Sat Apr 18 03:30:19 2015
@@ -113,6 +113,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueInputTez;
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
import
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PartitionerDefinedVertexManager;
+import
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager;
import
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigOutputFormatTez;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
@@ -392,6 +393,7 @@ public class TezDagBuilder extends TezOp
}
conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
+ conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
conf.set("udf.import.list",
ObjectSerializer.serialize(PigContext.getPackageImportList()));
@@ -437,6 +439,11 @@ public class TezDagBuilder extends TezOp
edge.dataSourceType, edge.schedulingType, out, in);
}
+ if (to.isUseGraceParallelism()) {
+ // Put datamovement to null to prevent vertex "to" from starting.
It will be started by PigGraceShuffleVertexManager
+ return EdgeProperty.create((EdgeManagerPluginDescriptor)null,
edge.dataSourceType,
+ edge.schedulingType, out, in);
+ }
return EdgeProperty.create(edge.dataMovementType, edge.dataSourceType,
edge.schedulingType, out, in);
}
@@ -458,6 +465,7 @@ public class TezDagBuilder extends TezOp
conf.set(MRJobConfig.COMBINE_CLASS_ATTR,
PigCombiner.Combine.class.getName());
conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
+ conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
conf.set("udf.import.list",
ObjectSerializer.serialize(PigContext.getPackageImportList()));
@@ -515,6 +523,7 @@ public class TezDagBuilder extends TezOp
ObjectSerializer.serialize(PigContext.getPackageImportList()));
payloadConf.set("exectype", "TEZ");
payloadConf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
+ payloadConf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
payloadConf.setClass(MRConfiguration.INPUTFORMAT_CLASS,
PigInputFormat.class, InputFormat.class);
@@ -657,9 +666,77 @@ public class TezDagBuilder extends TezOp
UserPayload userPayload =
TezUtils.createUserPayloadFromConf(payloadConf);
procDesc.setUserPayload(userPayload).setHistoryText(convertToHistoryText(tezOp.getOperatorKey().toString(),
payloadConf));
- Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(),
procDesc, tezOp.getVertexParallelism(),
- tezOp.isUseMRMapSettings() ?
MRHelpers.getResourceForMRMapper(globalConf) :
MRHelpers.getResourceForMRReducer(globalConf));
+ String vmPluginName = null;
+ Configuration vmPluginConf = null;
+
+ // Set the right VertexManagerPlugin
+ if (tezOp.getEstimatedParallelism() != -1) {
+ if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
+ // Set VertexManagerPlugin to PartitionerDefinedVertexManager,
which is able
+ // to decrease/increase parallelism of sorting vertex
dynamically
+ // based on the numQuantiles calculated by sample aggregation
vertex
+ vmPluginName = PartitionerDefinedVertexManager.class.getName();
+ log.info("Set VertexManagerPlugin to
PartitionerDefinedParallelismVertexManager for vertex " +
tezOp.getOperatorKey().toString());
+ } else {
+ boolean containScatterGather = false;
+ boolean containCustomPartitioner = false;
+ for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
+ if (edge.dataMovementType ==
DataMovementType.SCATTER_GATHER) {
+ containScatterGather = true;
+ }
+ if (edge.partitionerClass!=null) {
+ containCustomPartitioner = true;
+ }
+ }
+ if (containScatterGather && !containCustomPartitioner) {
+ // Use auto-parallelism feature of ShuffleVertexManager to
dynamically
+ // reduce the parallelism of the vertex
+ if
(payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
+ &&
!TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()) {
+ vmPluginName =
PigGraceShuffleVertexManager.class.getName();
+ tezOp.setUseGraceParallelism(true);
+ } else {
+ vmPluginName = ShuffleVertexManager.class.getName();
+ }
+ vmPluginConf = (vmPluginConf == null) ?
ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
+
vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
true);
+ vmPluginConf.set("pig.tez.plan",
ObjectSerializer.serialize(getPlan()));
+ vmPluginConf.set("pig.pigContext",
ObjectSerializer.serialize(pc));
+ if (stores.size() <= 0) {
+ // Intermediate reduce. Set the bytes per reducer to
be block size.
+
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+ intermediateTaskInputSize);
+ } else if
(vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) !=
+
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) {
+
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+
vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
+ }
+ log.info("Set auto parallelism for vertex " +
tezOp.getOperatorKey().toString());
+ }
+ }
+ }
+ if (tezOp.isLimit() && (vmPluginName == null ||
vmPluginName.equals(PigGraceShuffleVertexManager.class.getName())||
+ vmPluginName.equals(ShuffleVertexManager.class.getName()))) {
+ if
(tezOp.inEdges.values().iterator().next().inputClassName.equals(UnorderedKVInput.class.getName()))
{
+ // Setting SRC_FRACTION to 0.00001 so that even if there are
100K source tasks,
+ // limit job starts when 1 source task finishes.
+ // If limit is part of a group by or join because their
parallelism is 1,
+ // we should leave the configuration with the defaults.
+ vmPluginConf = (vmPluginConf == null) ?
ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
+
vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
"0.00001");
+
vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
"0.00001");
+ log.info("Set " +
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION + " to 0.00001
for limit vertex " + tezOp.getOperatorKey().toString());
+ }
+ }
+ int parallel = tezOp.getVertexParallelism();
+ if (tezOp.isUseGraceParallelism()) {
+ parallel = -1;
+ }
+ Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(),
procDesc, parallel,
+ tezOp.isUseMRMapSettings() ?
MRHelpers.getResourceForMRMapper(globalConf) :
MRHelpers.getResourceForMRReducer(globalConf));
Map<String, String> taskEnv = new HashMap<String, String>();
MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, taskEnv,
tezOp.isUseMRMapSettings());
vertex.setTaskEnvironment(taskEnv);
@@ -772,6 +849,7 @@ public class TezDagBuilder extends TezOp
new DataSinkDescriptor(storeOutDescriptor,
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()),
dag.getCredentials()));
+ uniqueStoreOutputs.add(outputKey);
}
}
@@ -783,62 +861,6 @@ public class TezDagBuilder extends TezOp
new PigOutputFormat().checkOutputSpecs(job);
}
- String vmPluginName = null;
- Configuration vmPluginConf = null;
-
- // Set the right VertexManagerPlugin
- if (tezOp.getEstimatedParallelism() != -1) {
- if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
- // Set VertexManagerPlugin to PartitionerDefinedVertexManager,
which is able
- // to decrease/increase parallelism of sorting vertex
dynamically
- // based on the numQuantiles calculated by sample aggregation
vertex
- vmPluginName = PartitionerDefinedVertexManager.class.getName();
- log.info("Set VertexManagerPlugin to
PartitionerDefinedParallelismVertexManager for vertex " +
tezOp.getOperatorKey().toString());
- } else {
- boolean containScatterGather = false;
- boolean containCustomPartitioner = false;
- for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
- if (edge.dataMovementType ==
DataMovementType.SCATTER_GATHER) {
- containScatterGather = true;
- }
- if (edge.partitionerClass!=null) {
- containCustomPartitioner = true;
- }
- }
- if (containScatterGather && !containCustomPartitioner) {
- // Use auto-parallelism feature of ShuffleVertexManager to
dynamically
- // reduce the parallelism of the vertex
- vmPluginName = ShuffleVertexManager.class.getName();
- vmPluginConf = (vmPluginConf == null) ?
ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
-
vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
true);
- if (stores.size() <= 0) {
- // Intermediate reduce. Set the bytes per reducer to
be block size.
-
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
- intermediateTaskInputSize);
- } else if
(vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
-
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) !=
-
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) {
-
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
-
vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
-
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
- }
- log.info("Set auto parallelism for vertex " +
tezOp.getOperatorKey().toString());
- }
- }
- }
- if (tezOp.isLimit() && (vmPluginName == null ||
vmPluginName.equals(ShuffleVertexManager.class.getName()))) {
- if
(tezOp.inEdges.values().iterator().next().inputClassName.equals(UnorderedKVInput.class.getName()))
{
- // Setting SRC_FRACTION to 0.00001 so that even if there are
100K source tasks,
- // limit job starts when 1 source task finishes.
- // If limit is part of a group by or join because their
parallelism is 1,
- // we should leave the configuration with the defaults.
- vmPluginName = ShuffleVertexManager.class.getName();
- vmPluginConf = (vmPluginConf == null) ?
ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
-
vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
"0.00001");
-
vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
"0.00001");
- log.info("Set " +
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION + " to 0.00001
for limit vertex " + tezOp.getOperatorKey().toString());
- }
- }
// else if(tezOp.isLimitAfterSort())
// TODO: PIG-4049 If standalone Limit we need a new VertexManager or
new input
// instead of ShuffledMergedInput. For limit part of the sort (order
by parallel 1) itself
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
Sat Apr 18 03:30:19 2015
@@ -17,6 +17,8 @@
*/
package org.apache.pig.backend.hadoop.executionengine.tez.plan;
+import java.io.Serializable;
+
import org.apache.hadoop.mapreduce.Partitioner;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -28,7 +30,7 @@ import org.apache.tez.runtime.library.ou
/**
* Descriptor for Tez edge. It holds combine plan as well as edge properties.
*/
-public class TezEdgeDescriptor {
+public class TezEdgeDescriptor implements Serializable {
// Combiner runs on both input and output of Tez edge.
public PhysicalPlan combinePlan;
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
Sat Apr 18 03:30:19 2015
@@ -51,11 +51,11 @@ public class TezOperPlan extends Operato
private static final long serialVersionUID = 1L;
- private Map<String, Path> extraResources = new HashMap<String, Path>();
+ private transient Map<String, Path> extraResources = new HashMap<String,
Path>();
private int estimatedTotalParallelism = -1;
- private Credentials creds;
+ private transient Credentials creds;
public TezOperPlan() {
creds = new Credentials();
@@ -242,5 +242,47 @@ public class TezOperPlan extends Operato
super.remove(node);
}
}
+
+ // This method is used in PigGraceShuffleVertexManager to get a list of
grandparents.
+ // Also need to exclude grandparents which also a parent (a is both parent
and grandparent in the diagram below)
+ // a -> c
+ // \ b /
+ //
+ public static List<TezOperator>
getGrandParentsForGraceParallelism(TezOperPlan tezPlan, TezOperator op) {
+ List<TezOperator> grandParents = new ArrayList<TezOperator>();
+ List<TezOperator> preds = tezPlan.getPredecessors(op);
+ if (preds != null) {
+ for (TezOperator pred : preds) {
+ if (pred.isVertexGroup()) {
+ grandParents.clear();
+ return grandParents;
+ }
+ List<TezOperator> predPreds = tezPlan.getPredecessors(pred);
+ if (predPreds!=null) {
+ for (TezOperator predPred : predPreds) {
+ if (predPred.isVertexGroup()) {
+ grandParents.clear();
+ return grandParents;
+ }
+ if (!grandParents.contains(predPred)) {
+ grandParents.add(predPred);
+ }
+ }
+ } else {
+ grandParents.clear();
+ break;
+ }
+ }
+
+ if (!grandParents.isEmpty()) {
+ for (TezOperator pred : preds) {
+ if (grandParents.contains(pred)) {
+ grandParents.remove(pred);
+ }
+ }
+ }
+ }
+ return grandParents;
+ }
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
Sat Apr 18 03:30:19 2015
@@ -18,9 +18,11 @@
package org.apache.pig.backend.hadoop.executionengine.tez.plan;
import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -30,6 +32,8 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import
org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezOperDependencyParallelismEstimator.TezParallelismFactorVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.Operator;
@@ -50,7 +54,11 @@ public class TezOperator extends Operato
private static final long serialVersionUID = 1L;
// Processor pipeline
- public PhysicalPlan plan;
+ // Note TezOperator needs to be serialized and de-serialized to
+ // be used in PigGraceShuffleVertexManager, some fields are either
+ // big, or not serializable, and not in use in
PigGraceShuffleVertexManager,
+ // mark them as transient: plan, vertexGroupInfo, inputSplitInfo
+ public transient PhysicalPlan plan;
// Descriptors for out-bound edges.
public Map<OperatorKey, TezEdgeDescriptor> outEdges;
@@ -133,6 +141,12 @@ public class TezOperator extends Operato
private boolean useMRMapSettings = false;
+ private boolean useGraceParallelism = false;
+
+ private double parallelismFactor = -1;
+
+ private LinkedList<POStore> stores = null;
+
// Types of blocking operators. For now, we only support the following
ones.
public static enum OPER_FEATURE {
// Indicate if this job is a merge indexer
@@ -170,16 +184,16 @@ public class TezOperator extends Operato
private List<OperatorKey> vertexGroupMembers;
// For union
- private VertexGroupInfo vertexGroupInfo;
+ private transient VertexGroupInfo vertexGroupInfo;
// Mapping of OperatorKey of POStore OperatorKey to vertexGroup TezOperator
private Map<OperatorKey, OperatorKey> vertexGroupStores = null;
- public static class LoaderInfo {
+ public static class LoaderInfo implements Serializable {
private List<POLoad> loads = null;
private ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
private ArrayList<String> inpSignatureLists = new ArrayList<String>();
private ArrayList<Long> inpLimits = new ArrayList<Long>();
- private InputSplitInfo inputSplitInfo = null;
+ private transient InputSplitInfo inputSplitInfo = null;
public List<POLoad> getLoads() {
return loads;
}
@@ -497,7 +511,7 @@ public class TezOperator extends Operato
public String toString() {
StringBuilder sb = new StringBuilder(name() + ":\n");
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- if (!plan.isEmpty()) {
+ if (plan!=null && !plan.isEmpty()) {
plan.explain(baos);
String mp = new String(baos.toByteArray());
sb.append(shiftStringByTabs(mp, "| "));
@@ -602,6 +616,29 @@ public class TezOperator extends Operato
return loaderInfo;
}
+ public void setUseGraceParallelism(boolean useGraceParallelism) {
+ this.useGraceParallelism = useGraceParallelism;
+ }
+ public boolean isUseGraceParallelism() {
+ return useGraceParallelism;
+ }
+
+ public double getParallelismFactor() throws VisitorException {
+ if (parallelismFactor == -1) {
+ TezParallelismFactorVisitor parallelismFactorVisitor = new
TezParallelismFactorVisitor(plan, getOperatorKey().toString());
+ parallelismFactorVisitor.visit();
+ parallelismFactor = parallelismFactorVisitor.getFactor();
+ }
+ return parallelismFactor;
+ }
+
+ public LinkedList<POStore> getStores() throws VisitorException {
+ if (stores == null) {
+ stores = PlanHelper.getPhysicalOperators(plan, POStore.class);
+ }
+ return stores;
+ }
+
public static class VertexGroupInfo {
private List<OperatorKey> inputKeys;
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
Sat Apr 18 03:30:19 2015
@@ -169,6 +169,10 @@ public class LoaderProcessor extends Tez
// Not using MRInputAMSplitGenerator because delegation tokens are
// fetched in FileInputFormat
tezOp.getLoaderInfo().setInputSplitInfo(MRInputHelpers.generateInputSplitsToMem(conf,
false, 0));
+ // TODO: Can be set to -1 if TEZ-601 gets fixed and getting input
+ // splits can be moved to if(loads) block below
+ int parallelism =
tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks();
+ tezOp.setRequestedParallelism(parallelism);
}
return lds;
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
Sat Apr 18 03:30:19 2015
@@ -27,7 +27,6 @@ import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
@@ -82,15 +81,15 @@ public class ParallelismSetter extends T
// splits
int parallelism = -1;
boolean intermediateReducer = false;
- LinkedList<POStore> stores =
PlanHelper.getPhysicalOperators(tezOp.plan, POStore.class);
+ LinkedList<POStore> stores = tezOp.getStores();
if (stores.size() <= 0) {
intermediateReducer = true;
}
if (tezOp.getLoaderInfo().getLoads() != null &&
tezOp.getLoaderInfo().getLoads().size() > 0) {
- // TODO: Can be set to -1 if TEZ-601 gets fixed and getting
input
- // splits can be moved to if(loads) block below
- parallelism =
tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks();
- tezOp.setRequestedParallelism(parallelism);
+ // requestedParallelism of Loader vertex is handled in
LoaderProcessor
+ // propogate to vertexParallelism
+ tezOp.setVertexParallelism(tezOp.getRequestedParallelism());
+ return;
} else {
int prevParallelism = -1;
boolean isOneToOneParallelism = false;
@@ -107,7 +106,12 @@ public class ParallelismSetter extends T
+ tezOp.getOperatorKey().toString() + "
are not equal");
}
tezOp.setRequestedParallelism(pred.getRequestedParallelism());
-
tezOp.setEstimatedParallelism(pred.getEstimatedParallelism());
+ // If tezOp.estimatedParallelism already set, don't
override
+ // The only case is in PigGraceShuffleVertexManager,
which
+ // set the estimated parallelism according to the
output data size of the node
+ if (tezOp.getEstimatedParallelism()==-1) {
+
tezOp.setEstimatedParallelism(pred.getEstimatedParallelism());
+ }
isOneToOneParallelism = true;
incrementTotalParallelism(tezOp, parallelism);
parallelism = -1;
@@ -159,7 +163,7 @@ public class ParallelismSetter extends T
for (TezOperator pred :
mPlan.getPredecessors(tezOp)) {
if (pred.isSampleBasedPartitioner()) {
for (TezOperator partitionerPred :
mPlan.getPredecessors(pred)) {
- if
(partitionerPred.isSampleAggregation()) {
+ if
(partitionerPred.isSampleAggregation() && partitionerPred.plan!=null) {
LOG.debug("Updating
parallelism constant value to " + parallelism + " in " + partitionerPred.plan);
ParallelConstantVisitor
visitor =
new
ParallelConstantVisitor(partitionerPred.plan, parallelism);
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java?rev=1674429&view=auto
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
(added)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
Sat Apr 18 03:30:19 2015
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer;
+
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class TezEstimatedParallelismClearer extends TezOpPlanVisitor{
+ public TezEstimatedParallelismClearer(TezOperPlan plan) {
+ super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+ }
+
+ @Override
+ public void visitTezOp(TezOperator tezOp) throws VisitorException {
+ tezOp.setEstimatedParallelism(-1);
+ }
+}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
Sat Apr 18 03:30:19 2015
@@ -120,10 +120,8 @@ public class TezOperDependencyParallelis
//For cases like Union we can just limit to sum of pred
vertices parallelism
boolean applyFactor = !tezOper.isUnion();
- if (pred.plan!=null && applyFactor) { // pred.plan can be null
if it is a VertexGroup
- TezParallelismFactorVisitor parallelismFactorVisitor = new
TezParallelismFactorVisitor(pred.plan, tezOper.getOperatorKey().toString());
- parallelismFactorVisitor.visit();
- predParallelism = predParallelism *
parallelismFactorVisitor.getFactor();
+ if (!pred.isVertexGroup() && applyFactor) {
+ predParallelism = predParallelism *
pred.getParallelismFactor();
}
estimatedParallelism += predParallelism;
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
Sat Apr 18 03:30:19 2015
@@ -24,6 +24,8 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
@@ -68,7 +70,7 @@ public class PartitionerDefinedVertexMan
}
@Override
- public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+ public void onVertexManagerEventReceived(VertexManagerEvent vmEvent)
throws Exception {
// There could be multiple partition vertex sending VertexManagerEvent
// Only need to setVertexParallelism once
if (isParallelismSet) {
@@ -86,14 +88,14 @@ public class PartitionerDefinedVertexMan
if (dynamicParallelism!=currentParallelism) {
LOG.info("Pig Partitioner Defined Vertex Manager: reset
parallelism to " + dynamicParallelism
+ " from " + currentParallelism);
- Map<String, EdgeManagerPluginDescriptor> edgeManagers =
- new HashMap<String, EdgeManagerPluginDescriptor>();
- for(String vertex :
getContext().getInputVertexEdgeProperties().keySet()) {
- EdgeManagerPluginDescriptor edgeManagerDescriptor =
-
EdgeManagerPluginDescriptor.create(ScatterGatherEdgeManager.class.getName());
- edgeManagers.put(vertex, edgeManagerDescriptor);
+ Map<String, EdgeProperty> edgeManagers = new HashMap<String,
EdgeProperty>();
+ for(Map.Entry<String,EdgeProperty> entry :
getContext().getInputVertexEdgeProperties().entrySet()) {
+ EdgeProperty edge = entry.getValue();
+ edge =
EdgeProperty.create(DataMovementType.SCATTER_GATHER, edge.getDataSourceType(),
edge.getSchedulingType(),
+ edge.getEdgeSource(), edge.getEdgeDestination());
+ edgeManagers.put(entry.getKey(), edge);
}
- getContext().setVertexParallelism(dynamicParallelism, null,
edgeManagers, null);
+ getContext().reconfigureVertex(dynamicParallelism, null,
edgeManagers);
}
}
}
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java?rev=1674429&view=auto
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java
(added)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java
Sat Apr 18 03:30:19 2015
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import
org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
+import
org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezEstimatedParallelismClearer;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
+public class PigGraceShuffleVertexManager extends ShuffleVertexManager {
+
+ private TezOperPlan tezPlan;
+ private List<String> grandParents = new ArrayList<String>();
+ private List<String> finishedGrandParents = new ArrayList<String>();
+ private long bytesPerTask;
+ private Configuration conf;
+ private PigContext pc;
+ private int thisParallelism = -1;
+ private boolean parallelismSet = false;
+
+ private static final Log LOG =
LogFactory.getLog(PigGraceShuffleVertexManager.class);
+
+ public PigGraceShuffleVertexManager(VertexManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public synchronized void initialize() {
+ try {
+ conf =
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+ bytesPerTask =
conf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
+ pc =
(PigContext)ObjectSerializer.deserialize(conf.get("pig.pigContext"));
+ tezPlan =
(TezOperPlan)ObjectSerializer.deserialize(conf.get("pig.tez.plan"));
+ TezEstimatedParallelismClearer clearer = new
TezEstimatedParallelismClearer(tezPlan);
+ try {
+ clearer.visit();
+ } catch (VisitorException e) {
+ throw new TezUncheckedException(e);
+ }
+ TezOperator op =
tezPlan.getOperator(OperatorKey.fromString(getContext().getVertexName()));
+
+ // Collect grandparents of the vertex
+ Function<TezOperator, String> tezOpToString = new
Function<TezOperator, String>() {
+ public String apply(TezOperator op) { return
op.getOperatorKey().toString(); }
+ };
+ grandParents =
Lists.transform(TezOperPlan.getGrandParentsForGraceParallelism(tezPlan, op),
tezOpToString);
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+
+ // Register notification for grandparents
+ for (String grandParent : grandParents) {
+ getContext().registerForVertexStateUpdates(grandParent,
EnumSet.of(VertexState.SUCCEEDED));
+ }
+ super.initialize();
+ }
+
+ @Override
+ public synchronized void onVertexStateUpdated(VertexStateUpdate
stateUpdate) {
+ super.onVertexStateUpdated(stateUpdate);
+ if (parallelismSet) {
+ return;
+ }
+ String vertexName = stateUpdate.getVertexName();
+ if (grandParents.contains(vertexName)) {
+ if (!finishedGrandParents.contains(vertexName)) {
+ finishedGrandParents.add(vertexName);
+ }
+ }
+
+ TezOperator op =
tezPlan.getOperator(OperatorKey.fromString(getContext().getVertexName()));
+
+ List<TezOperator> preds = tezPlan.getPredecessors(op);
+ boolean anyPredAboutToStart = false;
+ for (TezOperator pred : preds) {
+ List<TezOperator> predPreds = tezPlan.getPredecessors(pred);
+ boolean predAboutToStart = true;
+ for (TezOperator predPred : predPreds) {
+ if
(!finishedGrandParents.contains(predPred.getOperatorKey().toString())) {
+ predAboutToStart = false;
+ break;
+ }
+ }
+ if (predAboutToStart) {
+ LOG.info("All predecessors for " +
pred.getOperatorKey().toString() + " are finished, time to " +
+ "set parallelism for " + getContext().getVertexName());
+ anyPredAboutToStart = true;
+ break;
+ }
+ }
+
+ // Now one of the predecessor is about to start, we need to make a
decision now
+ if (anyPredAboutToStart) {
+ // All grandparents finished, start parents with right parallelism
+
+ for (TezOperator pred : preds) {
+ if (pred.getRequestedParallelism()==-1) {
+ List<TezOperator> predPreds =
tezPlan.getPredecessors(pred);
+ if (predPreds!=null) {
+ for (TezOperator predPred : predPreds) {
+ String predPredVertexName =
predPred.getOperatorKey().toString();
+ if
(finishedGrandParents.contains(predPredVertexName)) {
+ // We shall get precise output size since all
those nodes are finished
+ long outputSize =
getContext().getVertexStatistics(predPredVertexName).getOutputStatistics(pred.getOperatorKey().toString()).getDataSize();
+ int desiredNumReducers =
(int)Math.ceil((double)outputSize/bytesPerTask);
+
predPred.setEstimatedParallelism(desiredNumReducers);
+ LOG.info(getContext().getVertexName() + ":
Grandparent " + predPred.getOperatorKey().toString() +
+ " finished with actual output " +
outputSize + " (desired parallelism " + desiredNumReducers + ")");
+ }
+ }
+ }
+ }
+ }
+ try {
+ ParallelismSetter parallelismSetter = new
ParallelismSetter(tezPlan, pc);
+ parallelismSetter.visit();
+ thisParallelism = op.getEstimatedParallelism();
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ Map<String, EdgeProperty> edgeManagers = new HashMap<String,
EdgeProperty>();
+ for(Map.Entry<String,EdgeProperty> entry :
getContext().getInputVertexEdgeProperties().entrySet()) {
+ EdgeProperty edge = entry.getValue();
+ edge = EdgeProperty.create(DataMovementType.SCATTER_GATHER,
edge.getDataSourceType(), edge.getSchedulingType(),
+ edge.getEdgeSource(), edge.getEdgeDestination());
+ edgeManagers.put(entry.getKey(), edge);
+ }
+ try {
+ getContext().reconfigureVertex(thisParallelism, null,
edgeManagers);
+ } catch (TezException e) {
+ throw new RuntimeException(e);
+ }
+ parallelismSet = true;
+ LOG.info("Initialize parallelism for " +
getContext().getVertexName() + " to " + thisParallelism);
+ }
+ }
+}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
Sat Apr 18 03:30:19 2015
@@ -198,7 +198,7 @@ public class TezCompilerUtil {
*/
static public boolean isIntermediateReducer(TezOperator tezOper) throws
VisitorException {
boolean intermediateReducer = false;
- LinkedList<POStore> stores =
PlanHelper.getPhysicalOperators(tezOper.plan, POStore.class);
+ LinkedList<POStore> stores = tezOper.getStores();
// Not map and not final reducer
if (stores.size() <= 0 &&
(tezOper.getLoaderInfo().getLoads() == null ||
tezOper.getLoaderInfo().getLoads().size() <= 0)) {
Modified: pig/trunk/src/org/apache/pig/impl/plan/OperatorKey.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/plan/OperatorKey.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/plan/OperatorKey.java (original)
+++ pig/trunk/src/org/apache/pig/impl/plan/OperatorKey.java Sat Apr 18 03:30:19
2015
@@ -89,5 +89,10 @@ public class OperatorKey implements Seri
NodeIdGenerator.getGenerator().getNextNodeId(scope));
}
+ static public OperatorKey fromString(String op) {
+ String scope = op.substring(0, op.indexOf("-"));
+ long id = Long.parseLong(op.substring(op.indexOf("-")+1));
+ return new OperatorKey(scope, id);
+ }
}
Modified:
pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestImplicitSplitOnTuple.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestImplicitSplitOnTuple.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
---
pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestImplicitSplitOnTuple.java
(original)
+++
pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestImplicitSplitOnTuple.java
Sat Apr 18 03:30:19 2015
@@ -56,7 +56,7 @@ public class TestImplicitSplitOnTuple {
"D2 = FOREACH tuplified GENERATE tuplify.memberId as memberId,
tuplify.shopId as shopId, score AS score;"+
"J = JOIN D1 By shopId, D2 by shopId;"+
"K = FOREACH J GENERATE D1::memberId AS member_id1,
D2::memberId AS member_id2, D1::shopId as shop;"+
- "L = ORDER K by shop;"+
+ "L = ORDER K by shop, member_id1, member_id2;"+
"STORE L into 'output' using mock.Storage;");
List<Tuple> list = data.get("output");
assertEquals("list: "+list, 20, list.size());
Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Sat Apr 18 03:30:19 2015
@@ -1359,11 +1359,13 @@ public class Util {
}
}
- public static void createLogAppender(Class clazz, String appenderName,
Writer writer) {
- Logger logger = Logger.getLogger(clazz);
- WriterAppender writerAppender = new WriterAppender(new
PatternLayout("%d [%t] %-5p %c %x - %m%n"), writer);
- writerAppender.setName(appenderName);
- logger.addAppender(writerAppender);
+ public static void createLogAppender(String appenderName, Writer writer,
Class...clazzes) {
+ for (Class clazz : clazzes) {
+ Logger logger = Logger.getLogger(clazz);
+ WriterAppender writerAppender = new WriterAppender(new
PatternLayout("%d [%t] %-5p %c %x - %m%n"), writer);
+ writerAppender.setName(appenderName);
+ logger.addAppender(writerAppender);
+ }
}
public static void removeLogAppender(Class clazz, String appenderName) {
Modified: pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java Sat Apr 18
03:30:19 2015
@@ -307,7 +307,7 @@ public class TestTezAutoParallelism {
PigServer.resetScope();
StringWriter writer = new StringWriter();
// When there is a combiner operation involved user specified
parallelism is overriden
- Util.createLogAppender(ParallelismSetter.class,
"testIncreaseIntermediateParallelism", writer);
+ Util.createLogAppender("testIncreaseIntermediateParallelism", writer,
ParallelismSetter.class);
try {
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
"true");
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
"4000");
Added: pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java?rev=1674429&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java (added)
+++ pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java Sat Apr 18
03:30:19 2015
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tez;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.regex.Pattern;
+
+import org.apache.pig.PigServer;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
+import
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.test.Util;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestTezGraceParallelism {
+ private static PigServer pigServer;
+ private static final String INPUT_FILE1 =
TestTezGraceParallelism.class.getName() + "_1";
+ private static final String INPUT_FILE2 =
TestTezGraceParallelism.class.getName() + "_2";
+ private static final String INPUT_DIR =
Util.getTestDirectory(TestTezGraceParallelism.class);
+
+ @Before
+ public void setUp() throws Exception {
+ pigServer = new PigServer(Util.getLocalTestMode());
+ }
+
+ private static void createFiles() throws IOException {
+ new File(INPUT_DIR).mkdirs();
+
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_DIR + "/" +
INPUT_FILE1));
+
+ String boyNames[] = {"Noah", "Liam", "Jacob", "Mason", "William",
+ "Ethan", "Michael", "Alexander", "Jayden", "Daniel"};
+ String girlNames[] = {"Sophia", "Emma", "Olivia", "Isabella", "Ava",
+ "Mia", "Emily", "Abigail", "Madison", "Elizabeth"};
+
+ String names[] = new String[boyNames.length + girlNames.length];
+ for (int i=0;i<boyNames.length;i++) {
+ names[i] = boyNames[i];
+ }
+ for (int i=0;i<girlNames.length;i++) {
+ names[boyNames.length+i] = girlNames[i];
+ }
+
+ Random rand = new Random(1);
+ for (int i=0;i<1000;i++) {
+ w.println(names[rand.nextInt(names.length)] + "\t" +
rand.nextInt(18));
+ }
+ w.close();
+
+ w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE2));
+ for (String name : boyNames) {
+ w.println(name + "\t" + "M");
+ }
+ for (String name : girlNames) {
+ w.println(name + "\t" + "F");
+ }
+ w.close();
+ }
+
+ private static void deleteFiles() {
+ Util.deleteDirectory(new File(INPUT_DIR));
+ }
+
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
+ createFiles();
+ }
+
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+ deleteFiles();
+ }
+
+ @Test
+ public void testDecreaseParallelism() throws IOException{
+ NodeIdGenerator.reset();
+ PigServer.resetScope();
+ StringWriter writer = new StringWriter();
+ Util.createLogAppender("testDecreaseParallelism", writer, new
Class[]{PigGraceShuffleVertexManager.class, ShuffleVertexManager.class});
+ try {
+ // DAG: 47 \
+ // -> 49(join) -> 52(distinct) -> 61(group)
+ // 48 /
+ // Parallelism at compile time:
+ // DAG: 47(1) \
+ // -> 49(2) -> 52(20) -> 61(200)
+ // 48(1) /
+ // However, when 49 finishes, the actual output of 49 only justify
parallelism 1.
+ // We adjust the parallelism for 61 to 100 based on this.
+ // At runtime, ShuffleVertexManager still kick in and further
reduce parallelism from 100 to 1.
+ //
+ pigServer.registerQuery("A = load '" + INPUT_DIR + "/" +
INPUT_FILE1 + "' as (name:chararray, age:int);");
+ pigServer.registerQuery("B = load '" + INPUT_DIR + "/" +
INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+ pigServer.registerQuery("C = join A by name, B by name;");
+ pigServer.registerQuery("D = foreach C generate A::name as name,
A::age as age, gender;");
+ pigServer.registerQuery("E = distinct D;");
+ pigServer.registerQuery("F = group E by gender;");
+ pigServer.registerQuery("G = foreach F generate group as gender,
SUM(E.age);");
+ Iterator<Tuple> iter = pigServer.openIterator("G");
+ List<Tuple> expectedResults = Util
+ .getTuplesFromConstantTupleStrings(new String[] {
+ "('F',1349L)", "('M',1373L)"});
+ Util.checkQueryOutputsAfterSort(iter, expectedResults);
+ assertTrue(writer.toString().contains("Initialize parallelism for
scope-52 to 20"));
+ assertTrue(writer.toString().contains("Initialize parallelism for
scope-61 to 100"));
+ assertTrue(writer.toString().contains("Reduce auto parallelism for
vertex: scope-49 to 1 from 2"));
+ assertTrue(writer.toString().contains("Reduce auto parallelism for
vertex: scope-52 to 1 from 20"));
+ assertTrue(writer.toString().contains("Reduce auto parallelism for
vertex: scope-61 to 1 from 100"));
+ } finally {
+ Util.removeLogAppender(PigGraceShuffleVertexManager.class,
"testDecreaseParallelism");
+ Util.removeLogAppender(ShuffleVertexManager.class,
"testDecreaseParallelism");
+ }
+ }
+
+ @Test
+ public void testIncreaseParallelism() throws IOException{
+ NodeIdGenerator.reset();
+ PigServer.resetScope();
+ StringWriter writer = new StringWriter();
+ Util.createLogAppender("testIncreaseParallelism", writer, new
Class[]{PigGraceShuffleVertexManager.class, ShuffleVertexManager.class});
+ try {
+ // DAG: 35 \ / 46(sample) \
+ // -> 37(order) -> 56(order) -> 58(order) ->
64(distinct)
+ // 36 /
+ // Parallelism at compile time:
+ // DAG: 35(1) \ / 46(1) \
+ // -> 37(2) -> 56(-1) -> 58(-1) -> 64(20)
+ // 36(1) /
+ // However, when 56 finishes, the actual output of 56 need
parallelism 5.
+ // We adjust the parallelism for 64 to 50 based on this.
+ // At runtime, ShuffleVertexManager will play and reduce
parallelism from 50
+
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
"80000");
+ pigServer.registerQuery("A = load '" + INPUT_DIR + "/" +
INPUT_FILE1 + "' as (name:chararray, age:int);");
+ pigServer.registerQuery("B = load '" + INPUT_DIR + "/" +
INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+ pigServer.registerQuery("C = join A by 1, B by 1;");
+ pigServer.registerQuery("D = foreach C generate A::name as name,
A::age as age, gender;");
+ pigServer.registerQuery("E = order D by name;");
+ pigServer.registerQuery("F = distinct E;");
+ Iterator<Tuple> iter = pigServer.openIterator("F");
+ int count = 0;
+ while (iter.hasNext()) {
+ iter.next();
+ count++;
+ }
+ assertEquals(count, 644);
+ System.out.println(writer.toString());
+ assertTrue(writer.toString().contains("Initialize parallelism for
scope-64 to 50"));
+ // There are randomness in which task finishes first, so the auto
parallelism could result different result
+ assertTrue(Pattern.compile("Reduce auto parallelism for vertex:
scope-64 to (\\d+)* from 50").matcher(writer.toString()).find());
+ } finally {
+ Util.removeLogAppender(PigGraceShuffleVertexManager.class,
"testIncreaseParallelism");
+ Util.removeLogAppender(ShuffleVertexManager.class,
"testIncreaseParallelism");
+ }
+ }
+
+ @Test
+ public void testJoinWithDifferentDepth() throws IOException{
+ NodeIdGenerator.reset();
+ PigServer.resetScope();
+ StringWriter writer = new StringWriter();
+ Util.createLogAppender("testJoinWithDifferentDepth", writer,
PigGraceShuffleVertexManager.class);
+ try {
+ // DAG: / 51(sample) \
+ // 42 -> 61(order) -> 63(order) -> 69(distinct)
\
+ //
-> 80(join)
+ // 78 -> 79(group)
/
+ // The join(80) has two inputs: 69 with deeper pipeline, 79 with
narrower.
+ // This test is to make sure 79 can start (by invoking
80.setParallelism) early,
+ // don't need to wait for 63 complete
+ pigServer.registerQuery("A = load '" + INPUT_DIR + "/" +
INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+ pigServer.registerQuery("B = order A by name;");
+ pigServer.registerQuery("C = distinct B;");
+ pigServer.registerQuery("D = load '" + INPUT_DIR + "/" +
INPUT_FILE1 + "' as (name:chararray, age:int);");
+ pigServer.registerQuery("E = group D by name;");
+ pigServer.registerQuery("F = foreach E generate group as name,
AVG(D.age) as avg_age;");
+ pigServer.registerQuery("G = join C by name, F by name;");
+ Iterator<Tuple> iter = pigServer.openIterator("G");
+ int count = 0;
+ while (iter.hasNext()) {
+ iter.next();
+ count++;
+ }
+ assertEquals(count, 20);
+ assertTrue(writer.toString().contains("All predecessors for
scope-79 are finished, time to set parallelism for scope-80"));
+ assertTrue(writer.toString().contains("Initialize parallelism for
scope-80 to 101"));
+ } finally {
+ Util.removeLogAppender(PigGraceShuffleVertexManager.class,
"testJoinWithDifferentDepth");
+ }
+ }
+
+ @Test
+ public void testJoinWithDifferentDepth2() throws IOException{
+ NodeIdGenerator.reset();
+ PigServer.resetScope();
+ StringWriter writer = new StringWriter();
+ Util.createLogAppender("testJoinWithDifferentDepth2", writer,
PigGraceShuffleVertexManager.class);
+ try {
+ // DAG: / 40(sample) \
+ // 31 -> 50(order) -> 52(order) -> 58(distinct)
\
+ //
-> 68(join)
+ // 67
/
+ // The join(68) should start immediately. We will not use
PigGraceShuffleVertexManager in this case
+ pigServer.registerQuery("A = load '" + INPUT_DIR + "/" +
INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+ pigServer.registerQuery("B = order A by name;");
+ pigServer.registerQuery("C = distinct B;");
+ pigServer.registerQuery("D = load '" + INPUT_DIR + "/" +
INPUT_FILE1 + "' as (name:chararray, age:int);");
+ pigServer.registerQuery("E = join C by name, D by name;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+ int count = 0;
+ while (iter.hasNext()) {
+ iter.next();
+ count++;
+ }
+ assertEquals(count, 1000);
+ assertFalse(writer.toString().contains("scope-68"));
+ } finally {
+ Util.removeLogAppender(PigGraceShuffleVertexManager.class,
"testJoinWithDifferentDepth2");
+ }
+ }
+}
Modified: pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java Sat Apr 18
03:30:19 2015
@@ -238,8 +238,9 @@ public class TestTezJobControlCompiler {
+ "store e into 'output';";
Pair<TezOperPlan, DAG> compiledPlan = compile(query);
TezOperator leafOper = compiledPlan.first.getLeaves().get(0);
+ assertTrue(leafOper.isUseGraceParallelism());
Vertex leafVertex =
compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
- assertEquals(leafVertex.getParallelism(), 70);
+ assertEquals(leafVertex.getParallelism(), -1);
}
@Test
@@ -271,8 +272,9 @@ public class TestTezJobControlCompiler {
List<TezOperator> leaves = compiledPlan.first.getLeaves();
Collections.sort(leaves);
TezOperator leafOper = leaves.get(1);
+ assertTrue(leafOper.isUseGraceParallelism());
Vertex leafVertex =
compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
- assertEquals(leafVertex.getParallelism(), 7);
+ assertEquals(leafVertex.getParallelism(), -1);
}
@Test
Modified: pig/trunk/test/org/apache/pig/tez/TestTezLauncher.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezLauncher.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezLauncher.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezLauncher.java Sat Apr 18 03:30:19
2015
@@ -20,12 +20,17 @@ package org.apache.pig.tez;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.util.Arrays;
+import java.util.Iterator;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.PigServer;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.test.MiniGenericCluster;
import org.apache.pig.test.Util;
@@ -56,8 +61,8 @@ public class TestTezLauncher {
};
private static final String OUTPUT_FILE = "TestTezLauncherOutput";
- private static final String[] OUTPUT_RECORDS = {
- "all\t{(apple),(pear),(pear),(strawberry),(orange)}"
+ private static final String[] OUTPUT_RECORDS = new String[] {
+ "(apple)", "(pear)", "(pear)", "(strawberry)", "(orange)"
};
@BeforeClass
@@ -96,16 +101,25 @@ public class TestTezLauncher {
PigStats pigStats = launcher.launchPig(pp, "testRun1", pc);
assertTrue(pigStats.isSuccessful());
- String[] output = Util.readOutput(cluster.getFileSystem(),
OUTPUT_FILE);
- for (int i = 0; i < output.length; i++) {
- assertEquals(OUTPUT_RECORDS[i], output[i]);
- }
-
assertEquals(1, pigStats.getInputStats().size());
assertEquals(INPUT_FILE, pigStats.getInputStats().get(0).getName());
assertEquals(1, pigStats.getOutputStats().size());
assertEquals(OUTPUT_FILE, pigStats.getOutputStats().get(0).getName());
+
+ query = "m = load '" + OUTPUT_FILE + "' as (a:chararray,
b:{(y:chararray)});";
+ pigServer = new PigServer(pc);
+ pigServer.registerQuery(query);
+ Iterator<Tuple> iter = pigServer.openIterator("m");
+ Tuple result = iter.next();
+ assertEquals(result.get(0).toString(), "all");
+ Iterator<Tuple> innerIter = ((DataBag)result.get(1)).iterator();
+ int count = 0;
+ while (innerIter.hasNext()) {
+
assertTrue(Arrays.asList(OUTPUT_RECORDS).contains(innerIter.next().toString()));
+ count++;
+ }
+ assertEquals(count, OUTPUT_RECORDS.length);
}
@Test