[SYSTEMML-418] Generalized spark memory budgets (support Spark>=1.6)

This patch generalized the handling of spark memory budgets to support
legacy (< Spark 1.6) and current (Spark 1.6 and 2.0) releases at the
same time. The major difference of these releases is the unified memory
management in Spark >1.6. Furthermore, this also includes fixes for
broadcast memory budgets (out of data not user space) and defaults like
spark executor memory (1g instead of 512m), as well as simplifications
regarding version comparisons and memory size parsing.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/3ef04409
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/3ef04409
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/3ef04409

Branch: refs/heads/master
Commit: 3ef044092855739841a59416a8070c18c609a474
Parents: 3ab79dc
Author: Matthias Boehm <[email protected]>
Authored: Tue Apr 26 01:01:06 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Tue Apr 26 03:29:58 2016 -0700

----------------------------------------------------------------------
 src/main/java/org/apache/sysml/hops/Hop.java    |   2 +-
 .../context/SparkExecutionContext.java          | 455 ++++++++++++-------
 .../parfor/stat/InfrastructureAnalyzer.java     |  31 +-
 .../sysml/runtime/util/UtilFunctions.java       |  39 ++
 .../java/org/apache/sysml/utils/Explain.java    |  21 +-
 5 files changed, 348 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3ef04409/src/main/java/org/apache/sysml/hops/Hop.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/Hop.java 
b/src/main/java/org/apache/sysml/hops/Hop.java
index f5b0fe6..782be9a 100644
--- a/src/main/java/org/apache/sysml/hops/Hop.java
+++ b/src/main/java/org/apache/sysml/hops/Hop.java
@@ -375,7 +375,7 @@ public abstract class Hop
                                boolean serializedStorage = false;
                                if( dimsKnown(true) && 
!Checkpoint.CHECKPOINT_SPARSE_CSR ) {
                                        double matrixPSize = 
OptimizerUtils.estimatePartitionedSizeExactSparsity(_dim1, _dim2, 
_rows_in_block, _cols_in_block, _nnz);
-                                       double dataCache = 
SparkExecutionContext.getConfiguredTotalDataMemory(true);
+                                       double dataCache = 
SparkExecutionContext.getDataMemoryBudget(true, true);
                                        serializedStorage = 
(MatrixBlock.evalSparseFormatInMemory(_dim1, _dim2, _nnz)
                                                                     && 
matrixPSize > dataCache ); //sparse in-memory does not fit in agg mem 
                                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3ef04409/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index d3134b3..d6da82f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -27,6 +27,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -69,33 +71,36 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
 import org.apache.sysml.runtime.util.MapReduceTool;
+import org.apache.sysml.runtime.util.UtilFunctions;
 import org.apache.sysml.utils.Statistics;
 
 
 public class SparkExecutionContext extends ExecutionContext
 {
-
        private static final Log LOG = 
LogFactory.getLog(SparkExecutionContext.class.getName());
-
+       private static final boolean LDEBUG = false; //local debug flag
+       
        //internal configurations 
        private static boolean LAZY_SPARKCTX_CREATION = true;
        private static boolean ASYNCHRONOUS_VAR_DESTROY = true;
        private static boolean FAIR_SCHEDULER_MODE = true;
        
        //executor memory and relative fractions as obtained from the spark 
configuration
-       private static long _memExecutors = -1; //mem per executors
-       private static double _memRatioData = -1; 
-       private static double _memRatioShuffle = -1;
-       private static int _numExecutors = -1; //total executors
-       private static int _defaultPar = -1; //total vcores  
-       private static boolean _confOnly = false; //infrastructure info based 
on config
+       private static SparkClusterConfig _sconf = null;
        
        // Only one SparkContext may be active per JVM. You must stop() the 
active SparkContext before creating a new one. 
        // This limitation may eventually be removed; see SPARK-2243 for more 
details.
        private static JavaSparkContext _spctx = null; 
        
-       protected SparkExecutionContext(Program prog) 
-       {
+       static {
+               // for internal debugging only
+               if( LDEBUG ) {
+                       
Logger.getLogger("org.apache.sysml.runtime.controlprogram.context")
+                                 .setLevel((Level) Level.DEBUG);
+               }
+       }
+       
+       protected SparkExecutionContext(Program prog) {
                //protected constructor to force use of ExecutionContextFactory
                this( true, prog );
        }
@@ -806,161 +811,6 @@ public class SparkExecutionContext extends 
ExecutionContext
                return nnz;
        }
        
-       
-       /**
-        * Returns the available memory budget for broadcast variables in bytes.
-        * In detail, this takes into account the total executor memory as well
-        * as relative ratios for data and shuffle. Note, that this is a 
conservative
-        * estimate since both data memory and shuffle memory might not be fully
-        * utilized. 
-        * 
-        * @return
-        */
-       public static double getBroadcastMemoryBudget()
-       {
-               if( _memExecutors < 0 || _memRatioData < 0 || _memRatioShuffle 
< 0 )
-                       analyzeSparkConfiguation();
-               
-               //70% of remaining free memory
-               double membudget = OptimizerUtils.MEM_UTIL_FACTOR *
-                                     (  _memExecutors 
-                                      - 
_memExecutors*(_memRatioData+_memRatioShuffle) );
-               
-               return membudget;
-       }
-       
-       /**
-        * 
-        * @return
-        */
-       public static double getConfiguredTotalDataMemory() {
-               return getConfiguredTotalDataMemory(false);
-       }
-       
-       /**
-        * 
-        * @param refresh
-        * @return
-        */
-       public static double getConfiguredTotalDataMemory(boolean refresh)
-       {
-               if( _memExecutors < 0 || _memRatioData < 0 )
-                       analyzeSparkConfiguation();
-               
-               //always get the current num executors on refresh because this 
might 
-               //change if not all executors are initially allocated and it is 
plan-relevant
-               if( refresh && !_confOnly ) {
-                       JavaSparkContext jsc = getSparkContextStatic();
-                       int numExec = 
Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);
-                       return _memExecutors * _memRatioData * numExec; 
-               }
-               else
-                       return ( _memExecutors * _memRatioData * _numExecutors 
);
-       }
-       
-       public static int getNumExecutors()
-       {
-               if( _numExecutors < 0 )
-                       analyzeSparkConfiguation();
-               
-               return _numExecutors;
-       }
-       
-       public static int getDefaultParallelism() {
-               return getDefaultParallelism(false);
-       }
-       
-       /**
-        * 
-        * @return
-        */
-       public static int getDefaultParallelism(boolean refresh) 
-       {
-               if( _defaultPar < 0 && !refresh )
-                       analyzeSparkConfiguation();
-               
-               //always get the current default parallelism on refresh because 
this might 
-               //change if not all executors are initially allocated and it is 
plan-relevant
-               if( refresh && !_confOnly )
-                       return getSparkContextStatic().defaultParallelism();
-               else
-                       return _defaultPar;
-       }
-       
-       /**
-        * 
-        */
-       public static void analyzeSparkConfiguation() 
-       {
-               SparkConf sconf = new SparkConf();
-               
-               //parse absolute executor memory
-               String tmp = sconf.get("spark.executor.memory", "512m");
-               if ( tmp.endsWith("g") || tmp.endsWith("G") )
-                       _memExecutors = 
Long.parseLong(tmp.substring(0,tmp.length()-1)) * 1024 * 1024 * 1024;
-               else if ( tmp.endsWith("m") || tmp.endsWith("M") )
-                       _memExecutors = 
Long.parseLong(tmp.substring(0,tmp.length()-1)) * 1024 * 1024;
-               else if( tmp.endsWith("k") || tmp.endsWith("K") )
-                       _memExecutors = 
Long.parseLong(tmp.substring(0,tmp.length()-1)) * 1024;
-               else 
-                       _memExecutors = 
Long.parseLong(tmp.substring(0,tmp.length()-2));
-               
-               //get data and shuffle memory ratios (defaults not specified in 
job conf)
-               _memRatioData = sconf.getDouble("spark.storage.memoryFraction", 
0.6); //default 60%
-               _memRatioShuffle = 
sconf.getDouble("spark.shuffle.memoryFraction", 0.2); //default 20%
-               
-               int numExecutors = sconf.getInt("spark.executor.instances", -1);
-               int numCoresPerExec = sconf.getInt("spark.executor.cores", -1);
-               int defaultPar = sconf.getInt("spark.default.parallelism", -1);
-               
-               if( numExecutors > 1 && (defaultPar > 1 || numCoresPerExec > 1) 
) {
-                       _numExecutors = numExecutors;
-                       _defaultPar = (defaultPar>1) ? defaultPar : 
numExecutors * numCoresPerExec;
-                       _confOnly = true;
-               }
-               else {
-                       //get default parallelism (total number of executors 
and cores)
-                       //note: spark context provides this information while 
conf does not
-                       //(for num executors we need to correct for driver and 
local mode)
-                       JavaSparkContext jsc = getSparkContextStatic();
-                       _numExecutors = 
Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);  
-                       _defaultPar = jsc.defaultParallelism();
-                       _confOnly = false; //implies env info refresh w/ spark 
context 
-               }
-               
-               //note: required time for infrastructure analysis on 5 node 
cluster: ~5-20ms. 
-       }
-
-       /**
-        * 
-        */
-       public void checkAndRaiseValidationWarningJDKVersion()
-       {
-               //check for jdk version less than jdk8
-               boolean isLtJDK8 = 
InfrastructureAnalyzer.isJavaVersionLessThanJDK8();
-
-               //check multi-threaded executors
-               int numExecutors = getNumExecutors();
-               int numCores = getDefaultParallelism();
-               boolean multiThreaded = (numCores > numExecutors);
-               
-               //check for jdk version less than 8 (and raise warning if 
multi-threaded)
-               if( isLtJDK8 && multiThreaded) 
-               {
-                       //get the jre version 
-                       String version = System.getProperty("java.version");
-                       
-                       
LOG.warn("########################################################################################");
-                       LOG.warn("### WARNING: Multi-threaded text reblock may 
lead to thread contention on JRE < 1.8 ####");
-                       LOG.warn("### java.version = " + version);
-                       LOG.warn("### total number of executors = " + 
numExecutors);
-                       LOG.warn("### total number of cores = " + numCores);
-                       LOG.warn("### JDK-7032154: Performance tuning of 
sun.misc.FloatingDecimal/FormattedFloatingDecimal");
-                       LOG.warn("### Workaround: Convert text to binary w/ 
changed configuration of one executor per core");
-                       
LOG.warn("########################################################################################");
-               }
-       }
-       
        ///////////////////////////////////////////
        // Cleanup of RDDs and Broadcast variables
        ///////
@@ -1246,6 +1096,7 @@ public class SparkExecutionContext extends 
ExecutionContext
                return false;
        }
        
+       
        ///////////////////////////////////////////
        // Debug String Handling (see explain); TODO to be removed
        ///////
@@ -1353,4 +1204,278 @@ public class SparkExecutionContext extends 
ExecutionContext
                
        }
        
+
+       ///////////////////////////////////////////
+       // Spark configuration handling 
+       ///////
+
+       /**
+        * Obtains the lazily analyzed spark cluster configuration. 
+        * 
+        * @return
+        */
+       public static SparkClusterConfig getSparkClusterConfig() {
+               //lazy creation of spark cluster config         
+               if( _sconf == null )
+                       _sconf = new SparkClusterConfig();
+               return _sconf;
+       }
+       
+       /**
+        * Obtains the available memory budget for broadcast variables in bytes.
+        * 
+        * @return
+        */
+       public static double getBroadcastMemoryBudget() {
+               return getSparkClusterConfig()
+                       .getBroadcastMemoryBudget();
+       }
+       
+       /**
+        * Obtain the available memory budget for data storage in bytes.
+        * 
+        * @param min      flag for minimum data budget 
+        * @param refresh  flag for refresh with spark context
+        * @return
+        */
+       public static double getDataMemoryBudget(boolean min, boolean refresh) {
+               return getSparkClusterConfig()
+                       .getDataMemoryBudget(min, refresh);
+       }
+       
+       /**
+        * Obtain the number of executors in the cluster (excluding the driver).
+        * 
+        * @return
+        */
+       public static int getNumExecutors() {
+               return getSparkClusterConfig()
+                       .getNumExecutors();
+       }
+       
+       /**
+        * Obtain the default degree of parallelism (cores in the cluster). 
+        * 
+        * @param refresh  flag for refresh with spark context 
+        * @return
+        */
+       public static int getDefaultParallelism(boolean refresh) {
+               return getSparkClusterConfig()
+                       .getDefaultParallelism(refresh);
+       }
+       
+       /**
+        * 
+        */
+       public void checkAndRaiseValidationWarningJDKVersion()
+       {
+               //check for jdk version less than jdk8
+               boolean isLtJDK8 = 
InfrastructureAnalyzer.isJavaVersionLessThanJDK8();
+
+               //check multi-threaded executors
+               int numExecutors = getNumExecutors();
+               int numCores = getDefaultParallelism(false);
+               boolean multiThreaded = (numCores > numExecutors);
+               
+               //check for jdk version less than 8 (and raise warning if 
multi-threaded)
+               if( isLtJDK8 && multiThreaded) 
+               {
+                       //get the jre version 
+                       String version = System.getProperty("java.version");
+                       
+                       
LOG.warn("########################################################################################");
+                       LOG.warn("### WARNING: Multi-threaded text reblock may 
lead to thread contention on JRE < 1.8 ####");
+                       LOG.warn("### java.version = " + version);
+                       LOG.warn("### total number of executors = " + 
numExecutors);
+                       LOG.warn("### total number of cores = " + numCores);
+                       LOG.warn("### JDK-7032154: Performance tuning of 
sun.misc.FloatingDecimal/FormattedFloatingDecimal");
+                       LOG.warn("### Workaround: Convert text to binary w/ 
changed configuration of one executor per core");
+                       
LOG.warn("########################################################################################");
+               }
+       }
+       
+       /**
+        * Captures relevant spark cluster configuration properties, e.g., 
memory budgets and 
+        * degree of parallelism. This configuration abstracts legacy (< Spark 
1.6) and current
+        * configurations and provides a unified view. 
+        */
+       private static class SparkClusterConfig 
+       {
+               //broadcasts are stored in mem-and-disk in data space, this 
config
+               //defines the fraction of data space to be used as broadcast 
budget
+               private static final double BROADCAST_DATA_FRACTION = 0.3;
+               
+               //forward private config from Spark's 
UnifiedMemoryManager.scala (>1.6)
+               private static final long RESERVED_SYSTEM_MEMORY_BYTES = 300 * 
1024 * 1024;
+               
+               //meta configurations
+               private boolean _legacyVersion = false; //spark version <1.6
+               private boolean _confOnly = false; //infrastructure info based 
on config
+               
+               //memory management configurations
+               private long _memExecutor = -1; //mem per executor
+               private double _memDataMinFrac = -1; //minimum data fraction
+               private double _memDataMaxFrac = -1; //maximum data fraction
+               private double _memBroadcastFrac = -1; //broadcast fraction
+               
+               //degree of parallelism configurations
+               private int _numExecutors = -1; //total executors
+               private int _defaultPar = -1; //total vcores  
+       
+               public SparkClusterConfig() 
+               {
+                       SparkConf sconf = new SparkConf();
+                       
+                       //parse version and config //TODO avoid spark context 
creation if possible
+                       JavaSparkContext jsc = getSparkContextStatic();
+                       _legacyVersion = 
(UtilFunctions.compareVersion(jsc.version(), "1.6.0") < 0
+                                       || 
sconf.getBoolean("spark.memory.useLegacyMode", false) );
+                       
+                       //obtain basic spark configurations
+                       if( _legacyVersion )
+                               analyzeSparkConfiguationLegacy(sconf);
+                       else
+                               analyzeSparkConfiguation(sconf);
+       
+                       //log debug of created spark cluster config
+                       if( LOG.isDebugEnabled() )
+                               LOG.debug( this.toString() );
+               }
+               
+               /**
+                * 
+                * @return
+                */
+               public long getBroadcastMemoryBudget() {
+                       return (long) (_memExecutor * _memBroadcastFrac);
+               }
+               
+               /**
+                * 
+                * @param min
+                * @param refresh
+                * @return
+                */
+               public long getDataMemoryBudget(boolean min, boolean refresh) {
+                       //always get the current num executors on refresh 
because this might 
+                       //change if not all executors are initially allocated 
and it is plan-relevant
+                       int numExec = _numExecutors;
+                       if( refresh && !_confOnly ) {
+                               JavaSparkContext jsc = getSparkContextStatic();
+                               numExec = 
Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);
+                       }
+                       
+                       //compute data memory budget
+                       return (long) ( numExec * _memExecutor *
+                               (min ? _memDataMinFrac : _memDataMaxFrac) );    
+               }
+
+               /**
+                * 
+                * @return
+                */
+               public int getNumExecutors() {
+                       if( _numExecutors < 0 )
+                               analyzeSparkParallelismConfiguation(null);      
                
+                       return _numExecutors;
+               }
+               
+               /**
+                * 
+                * @param refresh
+                * @return
+                */
+               public int getDefaultParallelism(boolean refresh) {
+                       if( _defaultPar < 0 && !refresh )
+                               analyzeSparkParallelismConfiguation(null);
+                       
+                       //always get the current default parallelism on refresh 
because this might 
+                       //change if not all executors are initially allocated 
and it is plan-relevant
+                       return ( refresh && !_confOnly ) ?
+                               getSparkContextStatic().defaultParallelism() : 
_defaultPar;
+               }
+
+               /**
+                * 
+                * @param conf
+                */
+               public void analyzeSparkConfiguationLegacy(SparkConf conf)  {
+                       //ensure allocated spark conf
+                       SparkConf sconf = (conf == null) ? new SparkConf() : 
conf;
+                       
+                       //parse absolute executor memory
+                       _memExecutor = UtilFunctions.parseMemorySize(
+                                       sconf.get("spark.executor.memory", 
"1g"));
+                       
+                       //get data and shuffle memory ratios (defaults not 
specified in job conf)
+                       double dataFrac = 
sconf.getDouble("spark.storage.memoryFraction", 0.6); //default 60%
+                       _memDataMinFrac = dataFrac;
+                       _memDataMaxFrac = dataFrac;
+                       _memBroadcastFrac = dataFrac * BROADCAST_DATA_FRACTION; 
//default 18%
+                       
+                       //analyze spark degree of parallelism 
+                       analyzeSparkParallelismConfiguation(sconf);     
+               }
+               
+               /**
+                * 
+                * @param conf
+                */
+               public void analyzeSparkConfiguation(SparkConf conf) {
+                       //ensure allocated spark conf
+                       SparkConf sconf = (conf == null) ? new SparkConf() : 
conf;
+                       
+                       //parse absolute executor memory, incl fixed cut off
+                       _memExecutor = UtilFunctions.parseMemorySize(
+                                       sconf.get("spark.executor.memory", 
"1g")) 
+                                       - RESERVED_SYSTEM_MEMORY_BYTES;
+                       
+                       //get data and shuffle memory ratios (defaults not 
specified in job conf)
+                       _memDataMinFrac = 
sconf.getDouble("spark.memory.storageFraction", 0.5); //default 50%
+                       _memDataMaxFrac = 
sconf.getDouble("spark.memory.fraction", 0.75); //default 75%
+                       _memBroadcastFrac = _memDataMaxFrac * 
BROADCAST_DATA_FRACTION; //default 22.5%
+                       
+                       //analyze spark degree of parallelism 
+                       analyzeSparkParallelismConfiguation(sconf);
+               }
+               
+               /**
+                * 
+                * @param sconf
+                */
+               private void analyzeSparkParallelismConfiguation(SparkConf 
sconf) {
+                       int numExecutors = 
sconf.getInt("spark.executor.instances", -1);
+                       int numCoresPerExec = 
sconf.getInt("spark.executor.cores", -1);
+                       int defaultPar = 
sconf.getInt("spark.default.parallelism", -1);
+                       
+                       if( numExecutors > 1 && (defaultPar > 1 || 
numCoresPerExec > 1) ) {
+                               _numExecutors = numExecutors;
+                               _defaultPar = (defaultPar>1) ? defaultPar : 
numExecutors * numCoresPerExec;
+                               _confOnly = true;
+                       }
+                       else {
+                               //get default parallelism (total number of 
executors and cores)
+                               //note: spark context provides this information 
while conf does not
+                               //(for num executors we need to correct for 
driver and local mode)
+                               JavaSparkContext jsc = getSparkContextStatic();
+                               _numExecutors = 
Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);  
+                               _defaultPar = jsc.defaultParallelism();
+                               _confOnly = false; //implies env info refresh 
w/ spark context 
+                       }
+               }
+               
+               @Override
+               public String toString() {
+                       StringBuilder sb = new 
StringBuilder("SparkClusterConfig: \n");
+                       sb.append("-- legacyVersion    = " + _legacyVersion + " 
("+getSparkContextStatic().version()+")\n" );
+                       sb.append("-- confOnly         = " + _confOnly + "\n");
+                       sb.append("-- memExecutor      = " + _memExecutor + 
"\n");
+                       sb.append("-- memDataMinFrac   = " + _memDataMinFrac + 
"\n");
+                       sb.append("-- memDataMaxFrac   = " + _memDataMaxFrac + 
"\n");
+                       sb.append("-- memBroadcastFrac = " + _memBroadcastFrac 
+ "\n");
+                       sb.append("-- numExecutors     = " + _numExecutors + 
"\n");
+                       sb.append("-- defaultPar       = " + _defaultPar + 
"\n");               
+                       return sb.toString();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3ef04409/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
index 3d595b8..75c96ba 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
+import org.apache.sysml.runtime.util.UtilFunctions;
 
 /**
  * Central place for analyzing and obtaining static infrastructure properties
@@ -347,31 +348,23 @@ public class InfrastructureAnalyzer
                try
                {
                        StringTokenizer st = new StringTokenizer( javaOpts, " " 
);
-                       while( st.hasMoreTokens() )
-                       {
+                       while( st.hasMoreTokens() ) {
                                String arg = st.nextToken();
                                if( !arg.startsWith("-Xmx") ) //search for max 
mem
                                        continue;
                                
-                               arg = arg.substring(4); //cut off "-Xmx"
+                               //cut off "-Xmx" parameter
+                               arg = arg.substring(4);
+                               
                                //parse number and unit
-                               if ( arg.endsWith("g") || arg.endsWith("G") )
-                                       ret = 
Long.parseLong(arg.substring(0,arg.length()-1)) * 1024 * 1024 * 1024;
-                               else if ( arg.endsWith("m") || 
arg.endsWith("M") )
-                                       ret = 
Long.parseLong(arg.substring(0,arg.length()-1)) * 1024 * 1024;
-                               else if( arg.endsWith("k") || arg.endsWith("K") 
)
-                                       ret = 
Long.parseLong(arg.substring(0,arg.length()-1)) * 1024;
-                               else 
-                                       ret = 
Long.parseLong(arg.substring(0,arg.length()-2));
+                               ret = UtilFunctions.parseMemorySize(arg); 
                        }
                        
-                       if( ret < 0 ) // no argument found
-                       {
+                       if( ret < 0 ) { // no argument found
                                ret = DEFAULT_JVM_SIZE;
                        }
                }
-               catch(Exception ex)
-               {
+               catch(Exception ex) {
                        //if anything breaks during parsing (e.g., because args 
not specified correctly)
                        ret = DEFAULT_JVM_SIZE;
                }
@@ -464,14 +457,8 @@ public class InfrastructureAnalyzer
                //step 2: analyze if used jdk older than jdk8
                String version = System.getProperty("java.version");
                
-               //parse jre version
-               int ix1 = version.indexOf('.');
-               int ix2 = version.indexOf('.', ix1+1);
-               int versionp1 = Integer.parseInt(version.substring(0, ix1));
-               int versionp2 = Integer.parseInt(version.substring(ix1+1, ix2));
-               
                //check for jdk version less than 8 (and raise warning if 
multi-threaded)
-               _isLtJDK8 = (versionp1 == 1 && versionp2 < 8);
+               _isLtJDK8 = (UtilFunctions.compareVersion(version, "1.8") < 0); 
        }
        
        /**

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3ef04409/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index 241b178..e292b39 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -353,6 +353,27 @@ public class UtilFunctions
                }
        }
        
+       /**
+        * Compares two version strings of format x.y.z, where x is major,
+        * y is minor, and z is maintenance release.
+        * 
+        * @param version1
+        * @param version2
+        * @return
+        */
+       public static int compareVersion( String version1, String version2 ) {
+               String[] partsv1 = version1.split("\\.");
+               String[] partsv2 = version2.split("\\.");
+               int len = Math.min(partsv1.length, partsv2.length);
+               for( int i=0; i<partsv1.length && i<len; i++ ) {
+                       Integer iv1 = Integer.parseInt(partsv1[i]);
+                       Integer iv2 = Integer.parseInt(partsv2[i]);
+                       if( iv1.compareTo(iv2) != 0 )
+                               return iv1.compareTo(iv2);
+               }               
+               return 0; //equal 
+       }
+       
        public static boolean isIntegerNumber( String str )
        {
                byte[] c = str.getBytes();
@@ -402,4 +423,22 @@ public class UtilFunctions
                }
                return sb.toString();
        }
+       
+       /**
+        * Parses a memory size with optional g/m/k quantifiers into its
+        * number representation.
+        * 
+        * @param arg
+        * @return
+        */
+       public static long parseMemorySize(String arg) {
+               if ( arg.endsWith("g") || arg.endsWith("G") )
+                       return Long.parseLong(arg.substring(0,arg.length()-1)) 
* 1024 * 1024 * 1024;
+               else if ( arg.endsWith("m") || arg.endsWith("M") )
+                       return Long.parseLong(arg.substring(0,arg.length()-1)) 
* 1024 * 1024;
+               else if( arg.endsWith("k") || arg.endsWith("K") )
+                       return Long.parseLong(arg.substring(0,arg.length()-1)) 
* 1024;
+               else 
+                       return Long.parseLong(arg.substring(0,arg.length()));
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3ef04409/src/main/java/org/apache/sysml/utils/Explain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/utils/Explain.java 
b/src/main/java/org/apache/sysml/utils/Explain.java
index c070378..6b266ea 100644
--- a/src/main/java/org/apache/sysml/utils/Explain.java
+++ b/src/main/java/org/apache/sysml/utils/Explain.java
@@ -117,14 +117,14 @@ public class Explain
                
                if( OptimizerUtils.isSparkExecutionMode() )
                {
-                       if( counts.numJobs-counts.numReblocks == 0 )
-                       {
+                       if( counts.numJobs-counts.numReblocks == 0 ) {
                                //avoid unnecessary lazy spark context creation 
on access to memory configurations
-                               sb.append( "?MB/?MB" );
+                               sb.append( "?MB/?MB/?MB" );
                        }
-                       else //default
-                       {
-                               sb.append( 
OptimizerUtils.toMB(SparkExecutionContext.getConfiguredTotalDataMemory()) );
+                       else { //default
+                               sb.append( 
OptimizerUtils.toMB(SparkExecutionContext.getDataMemoryBudget(true, false)) );
+                               sb.append( "MB/" );
+                               sb.append( 
OptimizerUtils.toMB(SparkExecutionContext.getDataMemoryBudget(false, false)) );
                                sb.append( "MB/" );
                                sb.append( 
OptimizerUtils.toMB(SparkExecutionContext.getBroadcastMemoryBudget()) );
                                sb.append( "MB" );
@@ -165,15 +165,12 @@ public class Explain
                
                if( OptimizerUtils.isSparkExecutionMode() ) //SP
                {
-                       if( counts.numJobs-counts.numReblocks == 0 )
-                       {
+                       if( counts.numJobs-counts.numReblocks == 0 ) {
                                //avoid unnecessary lazy spark context creation 
on access to memory configurations
                                sb.append( "?" );
                        }
-                       else //default
-                       {
-                               int rk = 
SparkExecutionContext.getDefaultParallelism(); 
-                               sb.append( rk );        
+                       else { //default
+                               sb.append( 
SparkExecutionContext.getDefaultParallelism(false) );        
                        }
                }
                else //MR

Reply via email to