Author: rohini
Date: Mon Jan 11 02:54:54 2016
New Revision: 1723975
URL: http://svn.apache.org/viewvc?rev=1723975&view=rev
Log:
PIG-4775: Better default values for shuffle bytes per reducer (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1723975&r1=1723974&r2=1723975&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jan 11 02:54:54 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4775: Better default values for shuffle bytes per reducer (rohini)
+
PIG-4753: Pigmix should have option to delete outputs after completing the
tests (mitdesai via rohini)
PIG-4744: Honor tez.staging-dir setting in tez-site.xml (rohini via daijy)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1723975&r1=1723974&r2=1723975&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Mon Jan 11 02:54:54 2016
@@ -165,6 +165,9 @@ import org.apache.tez.runtime.library.in
public class TezDagBuilder extends TezOpPlanVisitor {
private static final Log log = LogFactory.getLog(TezDagBuilder.class);
+ private static long SHUFFLE_BYTES_PER_REDUCER_GROUPBY_DEFAULT = 384 * 1024
* 1024L;
+ private static long SHUFFLE_BYTES_PER_REDUCER_DEFAULT = 256 * 1024 * 1024L;
+
private DAG dag;
private Map<String, LocalResource> localResources;
private PigContext pc;
@@ -705,17 +708,25 @@ public class TezDagBuilder extends TezOp
vmPluginName = ShuffleVertexManager.class.getName();
}
vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
true);
- if (stores.size() <= 0) {
- // Intermediate reduce. Set the bytes per reducer to
be block size.
-
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
- intermediateTaskInputSize);
- } else if
(vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
-
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) !=
-
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) {
-
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
-
vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
-
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
+ // For Intermediate reduce, set the bytes per reducer to
be block size.
+ long bytesPerReducer = intermediateTaskInputSize;
+ // If there are store statements, use
BYTES_PER_REDUCER_PARAM configured by user.
+ // If not as default use 384MB for group bys and 256 MB
for joins. Not using
+ // default 1G as that value was suited for mapreduce logic
where numReducers=(map input size/bytesPerReducer).
+ // In Tez, numReducers=(map output size/bytesPerReducer)
we need lower values to avoid skews in reduce
+ // as map input sizes are mostly always high compared to
map output.
+ if (stores.size() > 0) {
+ if
(vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
+ bytesPerReducer = vmPluginConf.getLong(
+
InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
+ } else if (tezOp.isGroupBy()) {
+ bytesPerReducer =
SHUFFLE_BYTES_PER_REDUCER_GROUPBY_DEFAULT;
+ } else {
+ bytesPerReducer =
SHUFFLE_BYTES_PER_REDUCER_DEFAULT;
+ }
}
+
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
bytesPerReducer);
log.info("Set auto parallelism for vertex " +
tezOp.getOperatorKey().toString());
}
}