[SYSTEMML-1889] Robustness parfor/hop memory budgets in spark exec mode Recently, we encountered robustness issues of parfor memory budgets in spark execution mode (remote memory set to Infinity). This patch improves the robustness of analyzing the spark cluster configuration, and the parfor-specific infrastructure analysis. The encountered issue was likely due to an invalid default parallelism (less than the number of executors), leading to MEM/(dpar/#exec)=INF.
Furthermore, this patch also updates the internal defaults of min and max storage memory fractions to the defaults used in Spark 2.1 and 2.2. Additionally, parfor now also reports the entire Spark cluster config when log=DEBUG, without unnecessarily creating the Spark context. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/d5f20b43 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/d5f20b43 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/d5f20b43 Branch: refs/heads/master Commit: d5f20b43b9e74e73747ee65f11f9767577b001e8 Parents: e60a4c2 Author: Matthias Boehm <[email protected]> Authored: Tue Sep 5 17:57:06 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Tue Sep 5 21:18:10 2017 -0700 ---------------------------------------------------------------------- .../context/SparkExecutionContext.java | 28 +++++++++++--------- .../parfor/opt/OptimizerRuleBased.java | 12 ++++++--- 2 files changed, 24 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/d5f20b43/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index d1ff7d8..e0352b0 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -1382,7 +1382,7 @@ public class SparkExecutionContext extends ExecutionContext { //broadcasts are stored in mem-and-disk in data space, this config //defines the fraction of data space to be used as broadcast budget - private static final double BROADCAST_DATA_FRACTION = 0.3; + private static final double BROADCAST_DATA_FRACTION = 0.35; //forward private config from Spark's UnifiedMemoryManager.scala (>1.6) private static final long RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024; @@ -1430,7 +1430,7 @@ public class SparkExecutionContext extends ExecutionContext //always get the current num executors on refresh because this might //change if not all executors are initially allocated and it is plan-relevant int numExec = _numExecutors; - if( refresh && !_confOnly ) { + if( (refresh && !_confOnly) || isSparkContextCreated() ) { JavaSparkContext jsc = getSparkContextStatic(); numExec = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1); } @@ -1452,14 +1452,15 @@ public class SparkExecutionContext extends ExecutionContext //always get the current default parallelism on refresh because this might //change if not all executors are initially allocated and it is plan-relevant - return ( refresh && !_confOnly ) ? + int par = ( (refresh && !_confOnly) || isSparkContextCreated() ) ? getSparkContextStatic().defaultParallelism() : _defaultPar; + return Math.max(par, 1); //robustness min parallelism } public void analyzeSparkConfiguationLegacy(SparkConf conf) { //ensure allocated spark conf SparkConf sconf = (conf == null) ? createSystemMLSparkConf() : conf; - + //parse absolute executor memory _memExecutor = UtilFunctions.parseMemorySize( sconf.get("spark.executor.memory", "1g")); @@ -1477,7 +1478,7 @@ public class SparkExecutionContext extends ExecutionContext public void analyzeSparkConfiguation(SparkConf conf) { //ensure allocated spark conf SparkConf sconf = (conf == null) ? createSystemMLSparkConf() : conf; - + //parse absolute executor memory, incl fixed cut off _memExecutor = UtilFunctions.parseMemorySize( sconf.get("spark.executor.memory", "1g")) @@ -1485,14 +1486,17 @@ public class SparkExecutionContext extends ExecutionContext //get data and shuffle memory ratios (defaults not specified in job conf) _memDataMinFrac = sconf.getDouble("spark.memory.storageFraction", 0.5); //default 50% - _memDataMaxFrac = sconf.getDouble("spark.memory.fraction", 0.75); //default 75% - _memBroadcastFrac = _memDataMaxFrac * BROADCAST_DATA_FRACTION; //default 22.5% - + _memDataMaxFrac = sconf.getDouble("spark.memory.fraction", 0.6); //default 60% + _memBroadcastFrac = _memDataMaxFrac * BROADCAST_DATA_FRACTION; //default 21% + //analyze spark degree of parallelism analyzeSparkParallelismConfiguation(sconf); } - private void analyzeSparkParallelismConfiguation(SparkConf sconf) { + private void analyzeSparkParallelismConfiguation(SparkConf conf) { + //ensure allocated spark conf + SparkConf sconf = (conf == null) ? createSystemMLSparkConf() : conf; + int numExecutors = sconf.getInt("spark.executor.instances", -1); int numCoresPerExec = sconf.getInt("spark.executor.cores", -1); int defaultPar = sconf.getInt("spark.default.parallelism", -1); @@ -1532,14 +1536,14 @@ public class SparkExecutionContext extends ExecutionContext @Override public String toString() { StringBuilder sb = new StringBuilder("SparkClusterConfig: \n"); - sb.append("-- legacyVersion = " + _legacyVersion + " ("+getSparkContextStatic().version()+")\n" ); + sb.append("-- legacyVersion = " + _legacyVersion + " ("+getSparkVersionString()+")\n" ); sb.append("-- confOnly = " + _confOnly + "\n"); + sb.append("-- numExecutors = " + _numExecutors + "\n"); + sb.append("-- defaultPar = " + _defaultPar + "\n"); sb.append("-- memExecutor = " + _memExecutor + "\n"); sb.append("-- memDataMinFrac = " + _memDataMinFrac + "\n"); sb.append("-- memDataMaxFrac = " + _memDataMaxFrac + "\n"); sb.append("-- memBroadcastFrac = " + _memBroadcastFrac + "\n"); - sb.append("-- numExecutors = " + _numExecutors + "\n"); - sb.append("-- defaultPar = " + _defaultPar + "\n"); return sb.toString(); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/d5f20b43/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java index c429bfa..ec229d5 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java @@ -213,9 +213,13 @@ public class OptimizerRuleBased extends Optimizer _cost = est; //debug and warnings output - LOG.debug(getOptMode()+" OPT: Optimize w/ max_mem="+toMB(_lm)+"/"+toMB(_rm)+"/"+toMB(_rm2)+", max_k="+_lk+"/"+_rk+"/"+_rk2+")." ); - if( _rnk<=0 || _rk<=0 ) - LOG.warn(getOptMode()+" OPT: Optimize for inactive cluster (num_nodes="+_rnk+", num_map_slots="+_rk+")." ); + if( LOG.isDebugEnabled() ) { + LOG.debug(getOptMode()+" OPT: Optimize w/ max_mem="+toMB(_lm)+"/"+toMB(_rm)+"/"+toMB(_rm2)+", max_k="+_lk+"/"+_rk+"/"+_rk2+")." ); + if( OptimizerUtils.isSparkExecutionMode() ) + LOG.debug(getOptMode()+" OPT: Optimize w/ "+SparkExecutionContext.getSparkClusterConfig().toString()); + if( _rnk <= 0 || _rk <= 0 ) + LOG.warn(getOptMode()+" OPT: Optimize for inactive cluster (num_nodes="+_rnk+", num_map_slots="+_rk+")." ); + } //ESTIMATE memory consumption pn.setSerialParFor(); //for basic mem consumption @@ -356,7 +360,7 @@ public class OptimizerRuleBased extends Optimizer _rk2 = _rk; //equal map/reduce unless we find counter-examples int cores = SparkExecutionContext.getDefaultParallelism(true) / SparkExecutionContext.getNumExecutors(); - int ccores = (int) Math.min(cores, _N); + int ccores = Math.max((int) Math.min(cores, _N), 1); _rm = SparkExecutionContext.getBroadcastMemoryBudget() / ccores; _rm2 = SparkExecutionContext.getBroadcastMemoryBudget() / ccores; }
