Author: rohini
Date: Mon Jan 11 02:54:54 2016
New Revision: 1723975

URL: http://svn.apache.org/viewvc?rev=1723975&view=rev
Log:
PIG-4775: Better default values for shuffle bytes per reducer (rohini)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1723975&r1=1723974&r2=1723975&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jan 11 02:54:54 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4775: Better default values for shuffle bytes per reducer (rohini)
+
 PIG-4753: Pigmix should have option to delete outputs after completing the 
tests (mitdesai via rohini)
 
 PIG-4744: Honor tez.staging-dir setting in tez-site.xml (rohini via daijy)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1723975&r1=1723974&r2=1723975&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 Mon Jan 11 02:54:54 2016
@@ -165,6 +165,9 @@ import org.apache.tez.runtime.library.in
 public class TezDagBuilder extends TezOpPlanVisitor {
     private static final Log log = LogFactory.getLog(TezDagBuilder.class);
 
+    private static long SHUFFLE_BYTES_PER_REDUCER_GROUPBY_DEFAULT = 384 * 1024 
* 1024L;
+    private static long SHUFFLE_BYTES_PER_REDUCER_DEFAULT = 256 * 1024 * 1024L;
+
     private DAG dag;
     private Map<String, LocalResource> localResources;
     private PigContext pc;
@@ -705,17 +708,25 @@ public class TezDagBuilder extends TezOp
                         vmPluginName = ShuffleVertexManager.class.getName();
                     }
                     
vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
 true);
-                    if (stores.size() <= 0) {
-                        // Intermediate reduce. Set the bytes per reducer to 
be block size.
-                        
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
-                                        intermediateTaskInputSize);
-                    } else if 
(vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
-                                    
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) !=
-                                    
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) {
-                        
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
-                                
vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
-                                        
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
+                    // For Intermediate reduce, set the bytes per reducer to 
be block size.
+                    long bytesPerReducer = intermediateTaskInputSize;
+                    // If there are store statements, use 
BYTES_PER_REDUCER_PARAM configured by user.
+                    // If not as default use 384MB for group bys and 256 MB 
for joins. Not using
+                    // default 1G as that value was suited for mapreduce logic 
where numReducers=(map input size/bytesPerReducer).
+                    // In Tez, numReducers=(map output size/bytesPerReducer) 
we need lower values to avoid skews in reduce
+                    // as map input sizes are mostly always high compared to 
map output.
+                    if (stores.size() > 0) {
+                        if 
(vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
+                            bytesPerReducer = vmPluginConf.getLong(
+                                            
InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+                                            
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
+                        } else if (tezOp.isGroupBy()) {
+                            bytesPerReducer = 
SHUFFLE_BYTES_PER_REDUCER_GROUPBY_DEFAULT;
+                        } else {
+                            bytesPerReducer = 
SHUFFLE_BYTES_PER_REDUCER_DEFAULT;
+                        }
                     }
+                    
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
 bytesPerReducer);
                     log.info("Set auto parallelism for vertex " + 
tezOp.getOperatorKey().toString());
                 }
             }


Reply via email to