Repository: incubator-systemml Updated Branches: refs/heads/master f9b53b96f -> 28fe4fe8f
[SYSTEMML-1276] Fix cluster analysis (avoid yarn calls in spark modes) Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/28fe4fe8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/28fe4fe8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/28fe4fe8 Branch: refs/heads/master Commit: 28fe4fe8ff28cb093ed345cd22e6280db6654888 Parents: f9b53b9 Author: Matthias Boehm <[email protected]> Authored: Sat Mar 11 19:42:53 2017 -0800 Committer: Matthias Boehm <[email protected]> Committed: Sat Mar 11 19:42:53 2017 -0800 ---------------------------------------------------------------------- .../org/apache/sysml/hops/OptimizerUtils.java | 6 +++ .../controlprogram/ParForProgramBlock.java | 24 ++++++---- .../parfor/opt/OptimizerRuleBased.java | 50 +++++++++++--------- .../parfor/stat/InfrastructureAnalyzer.java | 8 +++- 4 files changed, 55 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/28fe4fe8/src/main/java/org/apache/sysml/hops/OptimizerUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java index 6efd799..1a959b6 100644 --- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java +++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java @@ -487,6 +487,9 @@ public class OptimizerUtils */ public static int getNumReducers( boolean configOnly ) { + if( isSparkExecutionMode() ) + return SparkExecutionContext.getDefaultParallelism(false); + int ret = ConfigurationManager.getNumReducers(); if( !configOnly ) { ret = Math.min(ret,InfrastructureAnalyzer.getRemoteParallelReduceTasks()); @@ -501,6 +504,9 @@ public class OptimizerUtils public static int getNumMappers() { + if( isSparkExecutionMode() ) + return SparkExecutionContext.getDefaultParallelism(false); + int ret = InfrastructureAnalyzer.getRemoteParallelMapTasks(); //correction max number of reducers on yarn clusters http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/28fe4fe8/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java index 1023a80..3d1e6ee 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java @@ -1481,16 +1481,24 @@ public class ParForProgramBlock extends ForProgramBlock ResultMerge rm = null; //determine degree of parallelism - int numReducers = ConfigurationManager.getNumReducers(); - int maxMap = InfrastructureAnalyzer.getRemoteParallelMapTasks(); - int maxRed = InfrastructureAnalyzer.getRemoteParallelReduceTasks(); - //correction max number of reducers on yarn clusters - if( InfrastructureAnalyzer.isYarnEnabled() ) { - maxMap = (int)Math.max( maxMap, YarnClusterAnalyzer.getNumCores() ); - maxRed = (int)Math.max( maxRed, YarnClusterAnalyzer.getNumCores()/2 ); + int maxMap = -1, maxRed = -1; + if( OptimizerUtils.isSparkExecutionMode() ) { + maxMap = (int) SparkExecutionContext.getDefaultParallelism(true); + maxRed = maxMap; //equal map/reduce + } + else { + int numReducers = ConfigurationManager.getNumReducers(); + maxMap = InfrastructureAnalyzer.getRemoteParallelMapTasks(); + maxRed = Math.min(numReducers, + InfrastructureAnalyzer.getRemoteParallelReduceTasks()); + //correction max number of reducers on yarn clusters + if( InfrastructureAnalyzer.isYarnEnabled() ) { + maxMap = (int)Math.max( maxMap, YarnClusterAnalyzer.getNumCores() ); + maxRed = (int)Math.max( maxRed, YarnClusterAnalyzer.getNumCores()/2 ); + } } int numMap = Math.max(_numThreads, maxMap); - int numRed = Math.min(numReducers, maxRed); + int numRed = maxRed; //create result merge implementation switch( prm ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/28fe4fe8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java index 90db845..554b217 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java @@ -361,38 +361,42 @@ public class OptimizerRuleBased extends Optimizer _lk = InfrastructureAnalyzer.getLocalParallelism(); _lkmaxCP = (int) Math.ceil( PAR_K_FACTOR * _lk ); _lkmaxMR = (int) Math.ceil( PAR_K_MR_FACTOR * _lk ); - _rnk = InfrastructureAnalyzer.getRemoteParallelNodes(); - _rk = InfrastructureAnalyzer.getRemoteParallelMapTasks(); - _rk2 = InfrastructureAnalyzer.getRemoteParallelReduceTasks(); - _rkmax = (int) Math.ceil( PAR_K_FACTOR * _rk ); - _rkmax2 = (int) Math.ceil( PAR_K_FACTOR * _rk2 ); _lm = OptimizerUtils.getLocalMemBudget(); - _rm = OptimizerUtils.getRemoteMemBudgetMap(false); - _rm2 = OptimizerUtils.getRemoteMemBudgetReduce(); - //correction of max parallelism if yarn enabled because yarn - //does not have the notion of map/reduce slots and hence returns - //small constants of map=10*nodes, reduce=2*nodes - //(not doing this correction would loose available degree of parallelism) - if( InfrastructureAnalyzer.isYarnEnabled() ) { - long tmprk = YarnClusterAnalyzer.getNumCores(); - _rk = (int) Math.max( _rk, tmprk ); - _rk2 = (int) Math.max( _rk2, tmprk/2 ); - } - - //correction of max parallelism and memory if spark runtime enabled because - //spark limits the available parallelism by its own executor configuration + //spark-specific cluster characteristics if( OptimizerUtils.isSparkExecutionMode() ) { - _rk = (int) SparkExecutionContext.getDefaultParallelism(true); + //we get all required cluster characteristics from spark's configuration + //to avoid invoking yarns cluster status + _rnk = SparkExecutionContext.getNumExecutors(); + _rk = (int) SparkExecutionContext.getDefaultParallelism(true); _rk2 = _rk; //equal map/reduce unless we find counter-examples - _rkmax = (int) Math.ceil( PAR_K_FACTOR * _rk ); - _rkmax2 = (int) Math.ceil( PAR_K_FACTOR * _rk2 ); int cores = SparkExecutionContext.getDefaultParallelism(true) / SparkExecutionContext.getNumExecutors(); int ccores = (int) Math.min(cores, _N); - _rm = SparkExecutionContext.getBroadcastMemoryBudget() / ccores; + _rm = SparkExecutionContext.getBroadcastMemoryBudget() / ccores; _rm2 = SparkExecutionContext.getBroadcastMemoryBudget() / ccores; } + //mr/yarn-specific cluster characteristics + else { + _rnk = InfrastructureAnalyzer.getRemoteParallelNodes(); + _rk = InfrastructureAnalyzer.getRemoteParallelMapTasks(); + _rk2 = InfrastructureAnalyzer.getRemoteParallelReduceTasks(); + _rm = OptimizerUtils.getRemoteMemBudgetMap(false); + _rm2 = OptimizerUtils.getRemoteMemBudgetReduce(); + + //correction of max parallelism if yarn enabled because yarn + //does not have the notion of map/reduce slots and hence returns + //small constants of map=10*nodes, reduce=2*nodes + //(not doing this correction would loose available degree of parallelism) + if( InfrastructureAnalyzer.isYarnEnabled() ) { + long tmprk = YarnClusterAnalyzer.getNumCores(); + _rk = (int) Math.max( _rk, tmprk ); + _rk2 = (int) Math.max( _rk2, tmprk/2 ); + } + } + + _rkmax = (int) Math.ceil( PAR_K_FACTOR * _rk ); + _rkmax2 = (int) Math.ceil( PAR_K_FACTOR * _rk2 ); } /////// http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/28fe4fe8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java index 7cb01a2..d04d69c 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java @@ -26,6 +26,8 @@ import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.sysml.conf.ConfigurationManager; +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; import org.apache.sysml.runtime.util.UtilFunctions; @@ -243,8 +245,10 @@ public class InfrastructureAnalyzer */ public static int getCkMaxMR() { - //default value (if not specified) - return getRemoteParallelMapTasks(); + if( OptimizerUtils.isSparkExecutionMode() ) + return SparkExecutionContext.getDefaultParallelism(true); + else + return getRemoteParallelMapTasks(); } /**
