[SYSTEMML-418] Generalized spark memory budgets (support Spark>=1.6) This patch generalized the handling of spark memory budgets to support legacy (< Spark 1.6) and current (Spark 1.6 and 2.0) releases at the same time. The major difference of these releases is the unified memory management in Spark >1.6. Furthermore, this also includes fixes for broadcast memory budgets (out of data not user space) and defaults like spark executor memory (1g instead of 512m), as well as simplifications regarding version comparisons and memory size parsing.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/3ef04409 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/3ef04409 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/3ef04409 Branch: refs/heads/master Commit: 3ef044092855739841a59416a8070c18c609a474 Parents: 3ab79dc Author: Matthias Boehm <[email protected]> Authored: Tue Apr 26 01:01:06 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Tue Apr 26 03:29:58 2016 -0700 ---------------------------------------------------------------------- src/main/java/org/apache/sysml/hops/Hop.java | 2 +- .../context/SparkExecutionContext.java | 455 ++++++++++++------- .../parfor/stat/InfrastructureAnalyzer.java | 31 +- .../sysml/runtime/util/UtilFunctions.java | 39 ++ .../java/org/apache/sysml/utils/Explain.java | 21 +- 5 files changed, 348 insertions(+), 200 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3ef04409/src/main/java/org/apache/sysml/hops/Hop.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/Hop.java b/src/main/java/org/apache/sysml/hops/Hop.java index f5b0fe6..782be9a 100644 --- a/src/main/java/org/apache/sysml/hops/Hop.java +++ b/src/main/java/org/apache/sysml/hops/Hop.java @@ -375,7 +375,7 @@ public abstract class Hop boolean serializedStorage = false; if( dimsKnown(true) && !Checkpoint.CHECKPOINT_SPARSE_CSR ) { double matrixPSize = OptimizerUtils.estimatePartitionedSizeExactSparsity(_dim1, _dim2, _rows_in_block, _cols_in_block, _nnz); - double dataCache = SparkExecutionContext.getConfiguredTotalDataMemory(true); + double dataCache = SparkExecutionContext.getDataMemoryBudget(true, true); serializedStorage = (MatrixBlock.evalSparseFormatInMemory(_dim1, _dim2, _nnz) && matrixPSize > dataCache ); //sparse in-memory does not fit in agg mem } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3ef04409/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index d3134b3..d6da82f 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -27,6 +27,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -69,33 +71,36 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.data.SparseBlock; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; import org.apache.sysml.runtime.util.MapReduceTool; +import org.apache.sysml.runtime.util.UtilFunctions; import org.apache.sysml.utils.Statistics; public class SparkExecutionContext extends ExecutionContext { - private static final Log LOG = LogFactory.getLog(SparkExecutionContext.class.getName()); - + private static final boolean LDEBUG = false; //local debug flag + //internal configurations private static boolean LAZY_SPARKCTX_CREATION = true; private static boolean ASYNCHRONOUS_VAR_DESTROY = true; private static boolean FAIR_SCHEDULER_MODE = true; //executor memory and relative fractions as obtained from the spark configuration - private static long _memExecutors = -1; //mem per executors - private static double _memRatioData = -1; - private static double _memRatioShuffle = -1; - private static int _numExecutors = -1; //total executors - private static int _defaultPar = -1; //total vcores - private static boolean _confOnly = false; //infrastructure info based on config + private static SparkClusterConfig _sconf = null; // Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one. // This limitation may eventually be removed; see SPARK-2243 for more details. private static JavaSparkContext _spctx = null; - protected SparkExecutionContext(Program prog) - { + static { + // for internal debugging only + if( LDEBUG ) { + Logger.getLogger("org.apache.sysml.runtime.controlprogram.context") + .setLevel((Level) Level.DEBUG); + } + } + + protected SparkExecutionContext(Program prog) { //protected constructor to force use of ExecutionContextFactory this( true, prog ); } @@ -806,161 +811,6 @@ public class SparkExecutionContext extends ExecutionContext return nnz; } - - /** - * Returns the available memory budget for broadcast variables in bytes. - * In detail, this takes into account the total executor memory as well - * as relative ratios for data and shuffle. Note, that this is a conservative - * estimate since both data memory and shuffle memory might not be fully - * utilized. - * - * @return - */ - public static double getBroadcastMemoryBudget() - { - if( _memExecutors < 0 || _memRatioData < 0 || _memRatioShuffle < 0 ) - analyzeSparkConfiguation(); - - //70% of remaining free memory - double membudget = OptimizerUtils.MEM_UTIL_FACTOR * - ( _memExecutors - - _memExecutors*(_memRatioData+_memRatioShuffle) ); - - return membudget; - } - - /** - * - * @return - */ - public static double getConfiguredTotalDataMemory() { - return getConfiguredTotalDataMemory(false); - } - - /** - * - * @param refresh - * @return - */ - public static double getConfiguredTotalDataMemory(boolean refresh) - { - if( _memExecutors < 0 || _memRatioData < 0 ) - analyzeSparkConfiguation(); - - //always get the current num executors on refresh because this might - //change if not all executors are initially allocated and it is plan-relevant - if( refresh && !_confOnly ) { - JavaSparkContext jsc = getSparkContextStatic(); - int numExec = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1); - return _memExecutors * _memRatioData * numExec; - } - else - return ( _memExecutors * _memRatioData * _numExecutors ); - } - - public static int getNumExecutors() - { - if( _numExecutors < 0 ) - analyzeSparkConfiguation(); - - return _numExecutors; - } - - public static int getDefaultParallelism() { - return getDefaultParallelism(false); - } - - /** - * - * @return - */ - public static int getDefaultParallelism(boolean refresh) - { - if( _defaultPar < 0 && !refresh ) - analyzeSparkConfiguation(); - - //always get the current default parallelism on refresh because this might - //change if not all executors are initially allocated and it is plan-relevant - if( refresh && !_confOnly ) - return getSparkContextStatic().defaultParallelism(); - else - return _defaultPar; - } - - /** - * - */ - public static void analyzeSparkConfiguation() - { - SparkConf sconf = new SparkConf(); - - //parse absolute executor memory - String tmp = sconf.get("spark.executor.memory", "512m"); - if ( tmp.endsWith("g") || tmp.endsWith("G") ) - _memExecutors = Long.parseLong(tmp.substring(0,tmp.length()-1)) * 1024 * 1024 * 1024; - else if ( tmp.endsWith("m") || tmp.endsWith("M") ) - _memExecutors = Long.parseLong(tmp.substring(0,tmp.length()-1)) * 1024 * 1024; - else if( tmp.endsWith("k") || tmp.endsWith("K") ) - _memExecutors = Long.parseLong(tmp.substring(0,tmp.length()-1)) * 1024; - else - _memExecutors = Long.parseLong(tmp.substring(0,tmp.length()-2)); - - //get data and shuffle memory ratios (defaults not specified in job conf) - _memRatioData = sconf.getDouble("spark.storage.memoryFraction", 0.6); //default 60% - _memRatioShuffle = sconf.getDouble("spark.shuffle.memoryFraction", 0.2); //default 20% - - int numExecutors = sconf.getInt("spark.executor.instances", -1); - int numCoresPerExec = sconf.getInt("spark.executor.cores", -1); - int defaultPar = sconf.getInt("spark.default.parallelism", -1); - - if( numExecutors > 1 && (defaultPar > 1 || numCoresPerExec > 1) ) { - _numExecutors = numExecutors; - _defaultPar = (defaultPar>1) ? defaultPar : numExecutors * numCoresPerExec; - _confOnly = true; - } - else { - //get default parallelism (total number of executors and cores) - //note: spark context provides this information while conf does not - //(for num executors we need to correct for driver and local mode) - JavaSparkContext jsc = getSparkContextStatic(); - _numExecutors = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1); - _defaultPar = jsc.defaultParallelism(); - _confOnly = false; //implies env info refresh w/ spark context - } - - //note: required time for infrastructure analysis on 5 node cluster: ~5-20ms. - } - - /** - * - */ - public void checkAndRaiseValidationWarningJDKVersion() - { - //check for jdk version less than jdk8 - boolean isLtJDK8 = InfrastructureAnalyzer.isJavaVersionLessThanJDK8(); - - //check multi-threaded executors - int numExecutors = getNumExecutors(); - int numCores = getDefaultParallelism(); - boolean multiThreaded = (numCores > numExecutors); - - //check for jdk version less than 8 (and raise warning if multi-threaded) - if( isLtJDK8 && multiThreaded) - { - //get the jre version - String version = System.getProperty("java.version"); - - LOG.warn("########################################################################################"); - LOG.warn("### WARNING: Multi-threaded text reblock may lead to thread contention on JRE < 1.8 ####"); - LOG.warn("### java.version = " + version); - LOG.warn("### total number of executors = " + numExecutors); - LOG.warn("### total number of cores = " + numCores); - LOG.warn("### JDK-7032154: Performance tuning of sun.misc.FloatingDecimal/FormattedFloatingDecimal"); - LOG.warn("### Workaround: Convert text to binary w/ changed configuration of one executor per core"); - LOG.warn("########################################################################################"); - } - } - /////////////////////////////////////////// // Cleanup of RDDs and Broadcast variables /////// @@ -1246,6 +1096,7 @@ public class SparkExecutionContext extends ExecutionContext return false; } + /////////////////////////////////////////// // Debug String Handling (see explain); TODO to be removed /////// @@ -1353,4 +1204,278 @@ public class SparkExecutionContext extends ExecutionContext } + + /////////////////////////////////////////// + // Spark configuration handling + /////// + + /** + * Obtains the lazily analyzed spark cluster configuration. + * + * @return + */ + public static SparkClusterConfig getSparkClusterConfig() { + //lazy creation of spark cluster config + if( _sconf == null ) + _sconf = new SparkClusterConfig(); + return _sconf; + } + + /** + * Obtains the available memory budget for broadcast variables in bytes. + * + * @return + */ + public static double getBroadcastMemoryBudget() { + return getSparkClusterConfig() + .getBroadcastMemoryBudget(); + } + + /** + * Obtain the available memory budget for data storage in bytes. + * + * @param min flag for minimum data budget + * @param refresh flag for refresh with spark context + * @return + */ + public static double getDataMemoryBudget(boolean min, boolean refresh) { + return getSparkClusterConfig() + .getDataMemoryBudget(min, refresh); + } + + /** + * Obtain the number of executors in the cluster (excluding the driver). + * + * @return + */ + public static int getNumExecutors() { + return getSparkClusterConfig() + .getNumExecutors(); + } + + /** + * Obtain the default degree of parallelism (cores in the cluster). + * + * @param refresh flag for refresh with spark context + * @return + */ + public static int getDefaultParallelism(boolean refresh) { + return getSparkClusterConfig() + .getDefaultParallelism(refresh); + } + + /** + * + */ + public void checkAndRaiseValidationWarningJDKVersion() + { + //check for jdk version less than jdk8 + boolean isLtJDK8 = InfrastructureAnalyzer.isJavaVersionLessThanJDK8(); + + //check multi-threaded executors + int numExecutors = getNumExecutors(); + int numCores = getDefaultParallelism(false); + boolean multiThreaded = (numCores > numExecutors); + + //check for jdk version less than 8 (and raise warning if multi-threaded) + if( isLtJDK8 && multiThreaded) + { + //get the jre version + String version = System.getProperty("java.version"); + + LOG.warn("########################################################################################"); + LOG.warn("### WARNING: Multi-threaded text reblock may lead to thread contention on JRE < 1.8 ####"); + LOG.warn("### java.version = " + version); + LOG.warn("### total number of executors = " + numExecutors); + LOG.warn("### total number of cores = " + numCores); + LOG.warn("### JDK-7032154: Performance tuning of sun.misc.FloatingDecimal/FormattedFloatingDecimal"); + LOG.warn("### Workaround: Convert text to binary w/ changed configuration of one executor per core"); + LOG.warn("########################################################################################"); + } + } + + /** + * Captures relevant spark cluster configuration properties, e.g., memory budgets and + * degree of parallelism. This configuration abstracts legacy (< Spark 1.6) and current + * configurations and provides a unified view. + */ + private static class SparkClusterConfig + { + //broadcasts are stored in mem-and-disk in data space, this config + //defines the fraction of data space to be used as broadcast budget + private static final double BROADCAST_DATA_FRACTION = 0.3; + + //forward private config from Spark's UnifiedMemoryManager.scala (>1.6) + private static final long RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024; + + //meta configurations + private boolean _legacyVersion = false; //spark version <1.6 + private boolean _confOnly = false; //infrastructure info based on config + + //memory management configurations + private long _memExecutor = -1; //mem per executor + private double _memDataMinFrac = -1; //minimum data fraction + private double _memDataMaxFrac = -1; //maximum data fraction + private double _memBroadcastFrac = -1; //broadcast fraction + + //degree of parallelism configurations + private int _numExecutors = -1; //total executors + private int _defaultPar = -1; //total vcores + + public SparkClusterConfig() + { + SparkConf sconf = new SparkConf(); + + //parse version and config //TODO avoid spark context creation if possible + JavaSparkContext jsc = getSparkContextStatic(); + _legacyVersion = (UtilFunctions.compareVersion(jsc.version(), "1.6.0") < 0 + || sconf.getBoolean("spark.memory.useLegacyMode", false) ); + + //obtain basic spark configurations + if( _legacyVersion ) + analyzeSparkConfiguationLegacy(sconf); + else + analyzeSparkConfiguation(sconf); + + //log debug of created spark cluster config + if( LOG.isDebugEnabled() ) + LOG.debug( this.toString() ); + } + + /** + * + * @return + */ + public long getBroadcastMemoryBudget() { + return (long) (_memExecutor * _memBroadcastFrac); + } + + /** + * + * @param min + * @param refresh + * @return + */ + public long getDataMemoryBudget(boolean min, boolean refresh) { + //always get the current num executors on refresh because this might + //change if not all executors are initially allocated and it is plan-relevant + int numExec = _numExecutors; + if( refresh && !_confOnly ) { + JavaSparkContext jsc = getSparkContextStatic(); + numExec = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1); + } + + //compute data memory budget + return (long) ( numExec * _memExecutor * + (min ? _memDataMinFrac : _memDataMaxFrac) ); + } + + /** + * + * @return + */ + public int getNumExecutors() { + if( _numExecutors < 0 ) + analyzeSparkParallelismConfiguation(null); + return _numExecutors; + } + + /** + * + * @param refresh + * @return + */ + public int getDefaultParallelism(boolean refresh) { + if( _defaultPar < 0 && !refresh ) + analyzeSparkParallelismConfiguation(null); + + //always get the current default parallelism on refresh because this might + //change if not all executors are initially allocated and it is plan-relevant + return ( refresh && !_confOnly ) ? + getSparkContextStatic().defaultParallelism() : _defaultPar; + } + + /** + * + * @param conf + */ + public void analyzeSparkConfiguationLegacy(SparkConf conf) { + //ensure allocated spark conf + SparkConf sconf = (conf == null) ? new SparkConf() : conf; + + //parse absolute executor memory + _memExecutor = UtilFunctions.parseMemorySize( + sconf.get("spark.executor.memory", "1g")); + + //get data and shuffle memory ratios (defaults not specified in job conf) + double dataFrac = sconf.getDouble("spark.storage.memoryFraction", 0.6); //default 60% + _memDataMinFrac = dataFrac; + _memDataMaxFrac = dataFrac; + _memBroadcastFrac = dataFrac * BROADCAST_DATA_FRACTION; //default 18% + + //analyze spark degree of parallelism + analyzeSparkParallelismConfiguation(sconf); + } + + /** + * + * @param conf + */ + public void analyzeSparkConfiguation(SparkConf conf) { + //ensure allocated spark conf + SparkConf sconf = (conf == null) ? new SparkConf() : conf; + + //parse absolute executor memory, incl fixed cut off + _memExecutor = UtilFunctions.parseMemorySize( + sconf.get("spark.executor.memory", "1g")) + - RESERVED_SYSTEM_MEMORY_BYTES; + + //get data and shuffle memory ratios (defaults not specified in job conf) + _memDataMinFrac = sconf.getDouble("spark.memory.storageFraction", 0.5); //default 50% + _memDataMaxFrac = sconf.getDouble("spark.memory.fraction", 0.75); //default 75% + _memBroadcastFrac = _memDataMaxFrac * BROADCAST_DATA_FRACTION; //default 22.5% + + //analyze spark degree of parallelism + analyzeSparkParallelismConfiguation(sconf); + } + + /** + * + * @param sconf + */ + private void analyzeSparkParallelismConfiguation(SparkConf sconf) { + int numExecutors = sconf.getInt("spark.executor.instances", -1); + int numCoresPerExec = sconf.getInt("spark.executor.cores", -1); + int defaultPar = sconf.getInt("spark.default.parallelism", -1); + + if( numExecutors > 1 && (defaultPar > 1 || numCoresPerExec > 1) ) { + _numExecutors = numExecutors; + _defaultPar = (defaultPar>1) ? defaultPar : numExecutors * numCoresPerExec; + _confOnly = true; + } + else { + //get default parallelism (total number of executors and cores) + //note: spark context provides this information while conf does not + //(for num executors we need to correct for driver and local mode) + JavaSparkContext jsc = getSparkContextStatic(); + _numExecutors = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1); + _defaultPar = jsc.defaultParallelism(); + _confOnly = false; //implies env info refresh w/ spark context + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("SparkClusterConfig: \n"); + sb.append("-- legacyVersion = " + _legacyVersion + " ("+getSparkContextStatic().version()+")\n" ); + sb.append("-- confOnly = " + _confOnly + "\n"); + sb.append("-- memExecutor = " + _memExecutor + "\n"); + sb.append("-- memDataMinFrac = " + _memDataMinFrac + "\n"); + sb.append("-- memDataMaxFrac = " + _memDataMaxFrac + "\n"); + sb.append("-- memBroadcastFrac = " + _memBroadcastFrac + "\n"); + sb.append("-- numExecutors = " + _numExecutors + "\n"); + sb.append("-- defaultPar = " + _defaultPar + "\n"); + return sb.toString(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3ef04409/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java index 3d595b8..75c96ba 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java @@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; +import org.apache.sysml.runtime.util.UtilFunctions; /** * Central place for analyzing and obtaining static infrastructure properties @@ -347,31 +348,23 @@ public class InfrastructureAnalyzer try { StringTokenizer st = new StringTokenizer( javaOpts, " " ); - while( st.hasMoreTokens() ) - { + while( st.hasMoreTokens() ) { String arg = st.nextToken(); if( !arg.startsWith("-Xmx") ) //search for max mem continue; - arg = arg.substring(4); //cut off "-Xmx" + //cut off "-Xmx" parameter + arg = arg.substring(4); + //parse number and unit - if ( arg.endsWith("g") || arg.endsWith("G") ) - ret = Long.parseLong(arg.substring(0,arg.length()-1)) * 1024 * 1024 * 1024; - else if ( arg.endsWith("m") || arg.endsWith("M") ) - ret = Long.parseLong(arg.substring(0,arg.length()-1)) * 1024 * 1024; - else if( arg.endsWith("k") || arg.endsWith("K") ) - ret = Long.parseLong(arg.substring(0,arg.length()-1)) * 1024; - else - ret = Long.parseLong(arg.substring(0,arg.length()-2)); + ret = UtilFunctions.parseMemorySize(arg); } - if( ret < 0 ) // no argument found - { + if( ret < 0 ) { // no argument found ret = DEFAULT_JVM_SIZE; } } - catch(Exception ex) - { + catch(Exception ex) { //if anything breaks during parsing (e.g., because args not specified correctly) ret = DEFAULT_JVM_SIZE; } @@ -464,14 +457,8 @@ public class InfrastructureAnalyzer //step 2: analyze if used jdk older than jdk8 String version = System.getProperty("java.version"); - //parse jre version - int ix1 = version.indexOf('.'); - int ix2 = version.indexOf('.', ix1+1); - int versionp1 = Integer.parseInt(version.substring(0, ix1)); - int versionp2 = Integer.parseInt(version.substring(ix1+1, ix2)); - //check for jdk version less than 8 (and raise warning if multi-threaded) - _isLtJDK8 = (versionp1 == 1 && versionp2 < 8); + _isLtJDK8 = (UtilFunctions.compareVersion(version, "1.8") < 0); } /** http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3ef04409/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java index 241b178..e292b39 100644 --- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java @@ -353,6 +353,27 @@ public class UtilFunctions } } + /** + * Compares two version strings of format x.y.z, where x is major, + * y is minor, and z is maintenance release. + * + * @param version1 + * @param version2 + * @return + */ + public static int compareVersion( String version1, String version2 ) { + String[] partsv1 = version1.split("\\."); + String[] partsv2 = version2.split("\\."); + int len = Math.min(partsv1.length, partsv2.length); + for( int i=0; i<partsv1.length && i<len; i++ ) { + Integer iv1 = Integer.parseInt(partsv1[i]); + Integer iv2 = Integer.parseInt(partsv2[i]); + if( iv1.compareTo(iv2) != 0 ) + return iv1.compareTo(iv2); + } + return 0; //equal + } + public static boolean isIntegerNumber( String str ) { byte[] c = str.getBytes(); @@ -402,4 +423,22 @@ public class UtilFunctions } return sb.toString(); } + + /** + * Parses a memory size with optional g/m/k quantifiers into its + * number representation. + * + * @param arg + * @return + */ + public static long parseMemorySize(String arg) { + if ( arg.endsWith("g") || arg.endsWith("G") ) + return Long.parseLong(arg.substring(0,arg.length()-1)) * 1024 * 1024 * 1024; + else if ( arg.endsWith("m") || arg.endsWith("M") ) + return Long.parseLong(arg.substring(0,arg.length()-1)) * 1024 * 1024; + else if( arg.endsWith("k") || arg.endsWith("K") ) + return Long.parseLong(arg.substring(0,arg.length()-1)) * 1024; + else + return Long.parseLong(arg.substring(0,arg.length())); + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3ef04409/src/main/java/org/apache/sysml/utils/Explain.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/utils/Explain.java b/src/main/java/org/apache/sysml/utils/Explain.java index c070378..6b266ea 100644 --- a/src/main/java/org/apache/sysml/utils/Explain.java +++ b/src/main/java/org/apache/sysml/utils/Explain.java @@ -117,14 +117,14 @@ public class Explain if( OptimizerUtils.isSparkExecutionMode() ) { - if( counts.numJobs-counts.numReblocks == 0 ) - { + if( counts.numJobs-counts.numReblocks == 0 ) { //avoid unnecessary lazy spark context creation on access to memory configurations - sb.append( "?MB/?MB" ); + sb.append( "?MB/?MB/?MB" ); } - else //default - { - sb.append( OptimizerUtils.toMB(SparkExecutionContext.getConfiguredTotalDataMemory()) ); + else { //default + sb.append( OptimizerUtils.toMB(SparkExecutionContext.getDataMemoryBudget(true, false)) ); + sb.append( "MB/" ); + sb.append( OptimizerUtils.toMB(SparkExecutionContext.getDataMemoryBudget(false, false)) ); sb.append( "MB/" ); sb.append( OptimizerUtils.toMB(SparkExecutionContext.getBroadcastMemoryBudget()) ); sb.append( "MB" ); @@ -165,15 +165,12 @@ public class Explain if( OptimizerUtils.isSparkExecutionMode() ) //SP { - if( counts.numJobs-counts.numReblocks == 0 ) - { + if( counts.numJobs-counts.numReblocks == 0 ) { //avoid unnecessary lazy spark context creation on access to memory configurations sb.append( "?" ); } - else //default - { - int rk = SparkExecutionContext.getDefaultParallelism(); - sb.append( rk ); + else { //default + sb.append( SparkExecutionContext.getDefaultParallelism(false) ); } } else //MR
