Repository: incubator-systemml
Updated Branches:
  refs/heads/master f9b53b96f -> 28fe4fe8f


[SYSTEMML-1276] Fix cluster analysis (avoid yarn calls in spark modes)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/28fe4fe8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/28fe4fe8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/28fe4fe8

Branch: refs/heads/master
Commit: 28fe4fe8ff28cb093ed345cd22e6280db6654888
Parents: f9b53b9
Author: Matthias Boehm <[email protected]>
Authored: Sat Mar 11 19:42:53 2017 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Sat Mar 11 19:42:53 2017 -0800

----------------------------------------------------------------------
 .../org/apache/sysml/hops/OptimizerUtils.java   |  6 +++
 .../controlprogram/ParForProgramBlock.java      | 24 ++++++----
 .../parfor/opt/OptimizerRuleBased.java          | 50 +++++++++++---------
 .../parfor/stat/InfrastructureAnalyzer.java     |  8 +++-
 4 files changed, 55 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/28fe4fe8/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index 6efd799..1a959b6 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -487,6 +487,9 @@ public class OptimizerUtils
         */
        public static int getNumReducers( boolean configOnly )
        {
+               if( isSparkExecutionMode() )
+                       return 
SparkExecutionContext.getDefaultParallelism(false);
+               
                int ret = ConfigurationManager.getNumReducers();
                if( !configOnly ) {
                        ret = 
Math.min(ret,InfrastructureAnalyzer.getRemoteParallelReduceTasks());
@@ -501,6 +504,9 @@ public class OptimizerUtils
 
        public static int getNumMappers()
        {
+               if( isSparkExecutionMode() )
+                       return 
SparkExecutionContext.getDefaultParallelism(false);
+               
                int ret = InfrastructureAnalyzer.getRemoteParallelMapTasks();
                        
                //correction max number of reducers on yarn clusters

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/28fe4fe8/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index 1023a80..3d1e6ee 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -1481,16 +1481,24 @@ public class ParForProgramBlock extends ForProgramBlock
                ResultMerge rm = null;
                
                //determine degree of parallelism
-               int numReducers = ConfigurationManager.getNumReducers();
-               int maxMap = InfrastructureAnalyzer.getRemoteParallelMapTasks();
-               int maxRed = 
InfrastructureAnalyzer.getRemoteParallelReduceTasks();
-               //correction max number of reducers on yarn clusters
-               if( InfrastructureAnalyzer.isYarnEnabled() ) {                  
                
-                       maxMap = (int)Math.max( maxMap, 
YarnClusterAnalyzer.getNumCores() );    
-                       maxRed = (int)Math.max( maxRed, 
YarnClusterAnalyzer.getNumCores()/2 );  
+               int maxMap = -1, maxRed = -1;
+               if( OptimizerUtils.isSparkExecutionMode() ) {
+                       maxMap = (int) 
SparkExecutionContext.getDefaultParallelism(true);
+                       maxRed = maxMap; //equal map/reduce
+               }
+               else {
+                       int numReducers = ConfigurationManager.getNumReducers();
+                       maxMap = 
InfrastructureAnalyzer.getRemoteParallelMapTasks();
+                       maxRed = Math.min(numReducers, 
+                               
InfrastructureAnalyzer.getRemoteParallelReduceTasks());
+                       //correction max number of reducers on yarn clusters
+                       if( InfrastructureAnalyzer.isYarnEnabled() ) {          
                        
+                               maxMap = (int)Math.max( maxMap, 
YarnClusterAnalyzer.getNumCores() );    
+                               maxRed = (int)Math.max( maxRed, 
YarnClusterAnalyzer.getNumCores()/2 );  
+                       }
                }
                int numMap = Math.max(_numThreads, maxMap);
-               int numRed = Math.min(numReducers, maxRed);
+               int numRed = maxRed;
                
                //create result merge implementation            
                switch( prm )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/28fe4fe8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 90db845..554b217 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -361,38 +361,42 @@ public class OptimizerRuleBased extends Optimizer
                _lk      = InfrastructureAnalyzer.getLocalParallelism();
                _lkmaxCP = (int) Math.ceil( PAR_K_FACTOR * _lk ); 
                _lkmaxMR = (int) Math.ceil( PAR_K_MR_FACTOR * _lk );
-               _rnk     = InfrastructureAnalyzer.getRemoteParallelNodes();  
-               _rk      = InfrastructureAnalyzer.getRemoteParallelMapTasks();
-               _rk2     = 
InfrastructureAnalyzer.getRemoteParallelReduceTasks();
-               _rkmax   = (int) Math.ceil( PAR_K_FACTOR * _rk ); 
-               _rkmax2  = (int) Math.ceil( PAR_K_FACTOR * _rk2 ); 
                _lm      = OptimizerUtils.getLocalMemBudget();
-               _rm      = OptimizerUtils.getRemoteMemBudgetMap(false);         
-               _rm2     = OptimizerUtils.getRemoteMemBudgetReduce();   
                
-               //correction of max parallelism if yarn enabled because yarn
-               //does not have the notion of map/reduce slots and hence 
returns 
-               //small constants of map=10*nodes, reduce=2*nodes
-               //(not doing this correction would loose available degree of 
parallelism)
-               if( InfrastructureAnalyzer.isYarnEnabled() ) {
-                       long tmprk = YarnClusterAnalyzer.getNumCores();
-                       _rk = (int) Math.max( _rk, tmprk );
-                       _rk2 = (int) Math.max( _rk2, tmprk/2 );
-               }
-               
-               //correction of max parallelism and memory if spark runtime 
enabled because
-               //spark limits the available parallelism by its own executor 
configuration
+               //spark-specific cluster characteristics
                if( OptimizerUtils.isSparkExecutionMode() ) {
-                       _rk = (int) 
SparkExecutionContext.getDefaultParallelism(true);
+                       //we get all required cluster characteristics from 
spark's configuration
+                       //to avoid invoking yarns cluster status
+                       _rnk = SparkExecutionContext.getNumExecutors(); 
+                       _rk  = (int) 
SparkExecutionContext.getDefaultParallelism(true);
                        _rk2 = _rk; //equal map/reduce unless we find 
counter-examples 
-                       _rkmax   = (int) Math.ceil( PAR_K_FACTOR * _rk ); 
-                       _rkmax2  = (int) Math.ceil( PAR_K_FACTOR * _rk2 ); 
                        int cores = 
SparkExecutionContext.getDefaultParallelism(true)
                                        / 
SparkExecutionContext.getNumExecutors();
                        int ccores = (int) Math.min(cores, _N);
-                       _rm = SparkExecutionContext.getBroadcastMemoryBudget() 
/ ccores;
+                       _rm  = SparkExecutionContext.getBroadcastMemoryBudget() 
/ ccores;
                        _rm2 = SparkExecutionContext.getBroadcastMemoryBudget() 
/ ccores;
                }
+               //mr/yarn-specific cluster characteristics
+               else {
+                       _rnk = InfrastructureAnalyzer.getRemoteParallelNodes(); 
 
+                       _rk  = 
InfrastructureAnalyzer.getRemoteParallelMapTasks();
+                       _rk2 = 
InfrastructureAnalyzer.getRemoteParallelReduceTasks();
+                       _rm  = OptimizerUtils.getRemoteMemBudgetMap(false);     
+                       _rm2 = OptimizerUtils.getRemoteMemBudgetReduce();       
+               
+                       //correction of max parallelism if yarn enabled because 
yarn
+                       //does not have the notion of map/reduce slots and 
hence returns 
+                       //small constants of map=10*nodes, reduce=2*nodes
+                       //(not doing this correction would loose available 
degree of parallelism)
+                       if( InfrastructureAnalyzer.isYarnEnabled() ) {
+                               long tmprk = YarnClusterAnalyzer.getNumCores();
+                               _rk  = (int) Math.max( _rk, tmprk );
+                               _rk2 = (int) Math.max( _rk2, tmprk/2 );
+                       }
+               }
+               
+               _rkmax   = (int) Math.ceil( PAR_K_FACTOR * _rk ); 
+               _rkmax2  = (int) Math.ceil( PAR_K_FACTOR * _rk2 ); 
        }
        
        ///////

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/28fe4fe8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
index 7cb01a2..d04d69c 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
@@ -243,8 +245,10 @@ public class InfrastructureAnalyzer
         */
        public static int getCkMaxMR() 
        {
-               //default value (if not specified)
-               return getRemoteParallelMapTasks();
+               if( OptimizerUtils.isSparkExecutionMode() )
+                       return 
SparkExecutionContext.getDefaultParallelism(true);
+               else
+                       return getRemoteParallelMapTasks();
        }
 
        /**

Reply via email to