[SYSTEMML-1889] Robustness parfor/hop memory budgets in spark exec mode

Recently, we encountered robustness issues of parfor memory budgets in
spark execution mode (remote memory set to Infinity). This patch
improves the robustness of analyzing the spark cluster configuration,
and the parfor-specific infrastructure analysis. The encountered issue
was likely due to an invalid default parallelism (less than the number
of executors), leading to MEM/(dpar/#exec)=INF. 

Furthermore, this patch also updates the internal defaults of min and
max storage memory fractions to the defaults used in Spark 2.1 and 2.2.
Additionally, parfor now also reports the entire Spark cluster config
when log=DEBUG, without unnecessarily creating the Spark context.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/d5f20b43
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/d5f20b43
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/d5f20b43

Branch: refs/heads/master
Commit: d5f20b43b9e74e73747ee65f11f9767577b001e8
Parents: e60a4c2
Author: Matthias Boehm <[email protected]>
Authored: Tue Sep 5 17:57:06 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Tue Sep 5 21:18:10 2017 -0700

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java          | 28 +++++++++++---------
 .../parfor/opt/OptimizerRuleBased.java          | 12 ++++++---
 2 files changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/d5f20b43/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index d1ff7d8..e0352b0 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -1382,7 +1382,7 @@ public class SparkExecutionContext extends 
ExecutionContext
        {
                //broadcasts are stored in mem-and-disk in data space, this 
config
                //defines the fraction of data space to be used as broadcast 
budget
-               private static final double BROADCAST_DATA_FRACTION = 0.3;
+               private static final double BROADCAST_DATA_FRACTION = 0.35;
 
                //forward private config from Spark's 
UnifiedMemoryManager.scala (>1.6)
                private static final long RESERVED_SYSTEM_MEMORY_BYTES = 300 * 
1024 * 1024;
@@ -1430,7 +1430,7 @@ public class SparkExecutionContext extends 
ExecutionContext
                        //always get the current num executors on refresh 
because this might
                        //change if not all executors are initially allocated 
and it is plan-relevant
                        int numExec = _numExecutors;
-                       if( refresh && !_confOnly ) {
+                       if( (refresh && !_confOnly) || isSparkContextCreated() 
) {
                                JavaSparkContext jsc = getSparkContextStatic();
                                numExec = 
Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);
                        }
@@ -1452,14 +1452,15 @@ public class SparkExecutionContext extends 
ExecutionContext
 
                        //always get the current default parallelism on refresh 
because this might
                        //change if not all executors are initially allocated 
and it is plan-relevant
-                       return ( refresh && !_confOnly ) ?
+                       int par = ( (refresh && !_confOnly) || 
isSparkContextCreated() ) ?
                                getSparkContextStatic().defaultParallelism() : 
_defaultPar;
+                       return Math.max(par, 1); //robustness min parallelism
                }
 
                public void analyzeSparkConfiguationLegacy(SparkConf conf)  {
                        //ensure allocated spark conf
                        SparkConf sconf = (conf == null) ? 
createSystemMLSparkConf() : conf;
-
+                       
                        //parse absolute executor memory
                        _memExecutor = UtilFunctions.parseMemorySize(
                                        sconf.get("spark.executor.memory", 
"1g"));
@@ -1477,7 +1478,7 @@ public class SparkExecutionContext extends 
ExecutionContext
                public void analyzeSparkConfiguation(SparkConf conf) {
                        //ensure allocated spark conf
                        SparkConf sconf = (conf == null) ? 
createSystemMLSparkConf() : conf;
-
+                       
                        //parse absolute executor memory, incl fixed cut off
                        _memExecutor = UtilFunctions.parseMemorySize(
                                        sconf.get("spark.executor.memory", 
"1g"))
@@ -1485,14 +1486,17 @@ public class SparkExecutionContext extends 
ExecutionContext
 
                        //get data and shuffle memory ratios (defaults not 
specified in job conf)
                        _memDataMinFrac = 
sconf.getDouble("spark.memory.storageFraction", 0.5); //default 50%
-                       _memDataMaxFrac = 
sconf.getDouble("spark.memory.fraction", 0.75); //default 75%
-                       _memBroadcastFrac = _memDataMaxFrac * 
BROADCAST_DATA_FRACTION; //default 22.5%
-
+                       _memDataMaxFrac = 
sconf.getDouble("spark.memory.fraction", 0.6); //default 60%
+                       _memBroadcastFrac = _memDataMaxFrac * 
BROADCAST_DATA_FRACTION; //default 21%
+                       
                        //analyze spark degree of parallelism
                        analyzeSparkParallelismConfiguation(sconf);
                }
 
-               private void analyzeSparkParallelismConfiguation(SparkConf 
sconf) {
+               private void analyzeSparkParallelismConfiguation(SparkConf 
conf) {
+                       //ensure allocated spark conf
+                       SparkConf sconf = (conf == null) ? 
createSystemMLSparkConf() : conf;
+                       
                        int numExecutors = 
sconf.getInt("spark.executor.instances", -1);
                        int numCoresPerExec = 
sconf.getInt("spark.executor.cores", -1);
                        int defaultPar = 
sconf.getInt("spark.default.parallelism", -1);
@@ -1532,14 +1536,14 @@ public class SparkExecutionContext extends 
ExecutionContext
                @Override
                public String toString() {
                        StringBuilder sb = new 
StringBuilder("SparkClusterConfig: \n");
-                       sb.append("-- legacyVersion    = " + _legacyVersion + " 
("+getSparkContextStatic().version()+")\n" );
+                       sb.append("-- legacyVersion    = " + _legacyVersion + " 
("+getSparkVersionString()+")\n" );
                        sb.append("-- confOnly         = " + _confOnly + "\n");
+                       sb.append("-- numExecutors     = " + _numExecutors + 
"\n");
+                       sb.append("-- defaultPar       = " + _defaultPar + 
"\n");
                        sb.append("-- memExecutor      = " + _memExecutor + 
"\n");
                        sb.append("-- memDataMinFrac   = " + _memDataMinFrac + 
"\n");
                        sb.append("-- memDataMaxFrac   = " + _memDataMaxFrac + 
"\n");
                        sb.append("-- memBroadcastFrac = " + _memBroadcastFrac 
+ "\n");
-                       sb.append("-- numExecutors     = " + _numExecutors + 
"\n");
-                       sb.append("-- defaultPar       = " + _defaultPar + 
"\n");
                        return sb.toString();
                }
        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/d5f20b43/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index c429bfa..ec229d5 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -213,9 +213,13 @@ public class OptimizerRuleBased extends Optimizer
                _cost = est;
                
                //debug and warnings output
-               LOG.debug(getOptMode()+" OPT: Optimize w/ 
max_mem="+toMB(_lm)+"/"+toMB(_rm)+"/"+toMB(_rm2)+", 
max_k="+_lk+"/"+_rk+"/"+_rk2+")." );
-               if( _rnk<=0 || _rk<=0 )
-                       LOG.warn(getOptMode()+" OPT: Optimize for inactive 
cluster (num_nodes="+_rnk+", num_map_slots="+_rk+")." );
+               if( LOG.isDebugEnabled() ) {
+                       LOG.debug(getOptMode()+" OPT: Optimize w/ 
max_mem="+toMB(_lm)+"/"+toMB(_rm)+"/"+toMB(_rm2)+", 
max_k="+_lk+"/"+_rk+"/"+_rk2+")." );
+                       if( OptimizerUtils.isSparkExecutionMode() )
+                               LOG.debug(getOptMode()+" OPT: Optimize w/ 
"+SparkExecutionContext.getSparkClusterConfig().toString());
+                       if( _rnk <= 0 || _rk <= 0 )
+                               LOG.warn(getOptMode()+" OPT: Optimize for 
inactive cluster (num_nodes="+_rnk+", num_map_slots="+_rk+")." );
+               }
                
                //ESTIMATE memory consumption 
                pn.setSerialParFor(); //for basic mem consumption 
@@ -356,7 +360,7 @@ public class OptimizerRuleBased extends Optimizer
                        _rk2 = _rk; //equal map/reduce unless we find 
counter-examples 
                        int cores = 
SparkExecutionContext.getDefaultParallelism(true)
                                        / 
SparkExecutionContext.getNumExecutors();
-                       int ccores = (int) Math.min(cores, _N);
+                       int ccores = Math.max((int) Math.min(cores, _N), 1);
                        _rm  = SparkExecutionContext.getBroadcastMemoryBudget() 
/ ccores;
                        _rm2 = SparkExecutionContext.getBroadcastMemoryBudget() 
/ ccores;
                }

Reply via email to