Repository: systemml Updated Branches: refs/heads/master 6992c3896 -> 6b877b04c
[SYSTEMML-1421] Remove unnecessary checks for Java 7 and below We dropped the support for Java 6 and 7 a while ago and meanwhile, we leverage Java 8 features such as lambda expressions expensively, which means that SystemML will not run with Java 7 or below. Accordingly, this patch removes unnecessary checks for Java versions < 8 (which had severe scalability issues for multi-threaded string-double parsing - due to internal static synchronized caches - as used in text and csv readers). Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/6b877b04 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/6b877b04 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/6b877b04 Branch: refs/heads/master Commit: 6b877b04cf9386107ced2afbb736b3c39161d9c8 Parents: 6992c38 Author: Matthias Boehm <[email protected]> Authored: Sat Dec 2 15:44:35 2017 -0800 Committer: Matthias Boehm <[email protected]> Committed: Sat Dec 2 15:44:35 2017 -0800 ---------------------------------------------------------------------- .../org/apache/sysml/hops/OptimizerUtils.java | 11 -- .../context/SparkExecutionContext.java | 27 ----- .../parfor/stat/InfrastructureAnalyzer.java | 103 ++++++------------- .../spark/CSVReblockSPInstruction.java | 3 - .../spark/ReblockSPInstruction.java | 30 ++---- 5 files changed, 41 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/6b877b04/src/main/java/org/apache/sysml/hops/OptimizerUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java index 9cc2442..d978500 100644 --- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java +++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java @@ -22,8 +22,6 @@ package org.apache.sysml.hops; import java.util.Arrays; import java.util.HashMap; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.sysml.api.DMLScript; @@ -62,8 +60,6 @@ import org.apache.sysml.yarn.ropt.YarnClusterAnalyzer; public class OptimizerUtils { - private static final Log LOG = LogFactory.getLog(OptimizerUtils.class.getName()); - //////////////////////////////////////////////////////// // Optimizer constants and flags (incl tuning knobs) // //////////////////////////////////////////////////////// @@ -364,13 +360,6 @@ public class OptimizerUtils cconf.set(ConfigType.PARALLEL_CP_READ_BINARYFORMATS, false); cconf.set(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS, false); } - else if( InfrastructureAnalyzer.isJavaVersionLessThanJDK8() - && InfrastructureAnalyzer.getLocalParallelism() > 1 ) - { - LOG.warn("Auto-disable multi-threaded text read for 'text' and 'csv' due to thread contention on JRE < 1.8" - + " (java.version="+ System.getProperty("java.version")+")."); - cconf.set(ConfigType.PARALLEL_CP_READ_TEXTFORMATS, false); - } //handle parallel matrix mult / rand configuration if (!dmlconf.getBooleanValue(DMLConfig.CP_PARALLEL_OPS)) { http://git-wip-us.apache.org/repos/asf/systemml/blob/6b877b04/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index 84fdd88..00710c3 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -1369,33 +1369,6 @@ public class SparkExecutionContext extends ExecutionContext .getDefaultParallelism(refresh); } - public void checkAndRaiseValidationWarningJDKVersion() - { - //check for jdk version less than jdk8 - boolean isLtJDK8 = InfrastructureAnalyzer.isJavaVersionLessThanJDK8(); - - //check multi-threaded executors - int numExecutors = getNumExecutors(); - int numCores = getDefaultParallelism(false); - boolean multiThreaded = (numCores > numExecutors); - - //check for jdk version less than 8 (and raise warning if multi-threaded) - if( isLtJDK8 && multiThreaded) - { - //get the jre version - String version = System.getProperty("java.version"); - - LOG.warn("########################################################################################"); - LOG.warn("### WARNING: Multi-threaded text reblock may lead to thread contention on JRE < 1.8 ####"); - LOG.warn("### java.version = " + version); - LOG.warn("### total number of executors = " + numExecutors); - LOG.warn("### total number of cores = " + numCores); - LOG.warn("### JDK-7032154: Performance tuning of sun.misc.FloatingDecimal/FormattedFloatingDecimal"); - LOG.warn("### Workaround: Convert text to binary w/ changed configuration of one executor per core"); - LOG.warn("########################################################################################"); - } - } - /** * Captures relevant spark cluster configuration properties, e.g., memory budgets and * degree of parallelism. This configuration abstracts legacy (< Spark 1.6) and current http://git-wip-us.apache.org/repos/asf/systemml/blob/6b877b04/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java index 1f1fce2..656089f 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java @@ -40,13 +40,11 @@ import org.apache.sysml.utils.lite.LiteCheck; */ public class InfrastructureAnalyzer { - public static final long DEFAULT_JVM_SIZE = 512 * 1024 * 1024; //static local master node properties private static int _localPar = -1; private static long _localJVMMaxMem = -1; - private static boolean _isLtJDK8 = false; //static hadoop cluster properties private static int _remotePar = -1; @@ -60,8 +58,7 @@ public class InfrastructureAnalyzer private static boolean _yarnEnabled = false; //static initialization, called for each JVM (on each node) - static - { + static { //analyze local node properties analyzeLocalMachine(); @@ -69,11 +66,6 @@ public class InfrastructureAnalyzer //analyzeHadoopCluster(); //note: due to overhead - analyze on-demand } - public static boolean isJavaVersionLessThanJDK8() - { - return _isLtJDK8; - } - /////// //methods for obtaining parallelism properties @@ -83,8 +75,7 @@ public class InfrastructureAnalyzer * * @return number of local processors of the current node */ - public static int getLocalParallelism() - { + public static int getLocalParallelism() { return _localPar; } @@ -111,8 +102,7 @@ public class InfrastructureAnalyzer return _remoteParMap; } - public static void setRemoteParallelMapTasks(int pmap) - { + public static void setRemoteParallelMapTasks(int pmap) { _remoteParMap = pmap; } @@ -127,8 +117,7 @@ public class InfrastructureAnalyzer return _remoteParReduce; } - public static void setRemoteParallelReduceTasks(int preduce) - { + public static void setRemoteParallelReduceTasks(int preduce) { _remoteParReduce = preduce; } @@ -140,13 +129,11 @@ public class InfrastructureAnalyzer * * @return maximum memory of the current JVM */ - public static long getLocalMaxMemory() - { + public static long getLocalMaxMemory() { return _localJVMMaxMem; } - public static void setLocalMaxMemory( long localMem ) - { + public static void setLocalMaxMemory( long localMem ) { _localJVMMaxMem = localMem; } @@ -161,8 +148,7 @@ public class InfrastructureAnalyzer return _remoteJVMMaxMemMap; } - public static void setRemoteMaxMemoryMap( long remoteMem ) - { + public static void setRemoteMaxMemoryMap( long remoteMem ) { _remoteJVMMaxMemMap = remoteMem; } @@ -177,8 +163,7 @@ public class InfrastructureAnalyzer return _remoteJVMMaxMemReduce; } - public static void setRemoteMaxMemoryReduce( long remoteMem ) - { + public static void setRemoteMaxMemoryReduce( long remoteMem ) { _remoteJVMMaxMemReduce = remoteMem; } @@ -199,8 +184,7 @@ public class InfrastructureAnalyzer return _localJT; } - public static boolean isLocalMode(JobConf job) - { + public static boolean isLocalMode(JobConf job) { // Due to a bug in HDP related to fetching the "mode" of execution within mappers, // we explicitly probe the relevant properties instead of relying on results from // analyzeHadoopCluster(). @@ -219,8 +203,7 @@ public class InfrastructureAnalyzer * * @return maximum local parallelism constraint */ - public static int getCkMaxCP() - { + public static int getCkMaxCP() { //default value (if not specified) return getLocalParallelism(); } @@ -230,8 +213,7 @@ public class InfrastructureAnalyzer * * @return maximum remote parallelism constraint */ - public static int getCkMaxMR() - { + public static int getCkMaxMR() { if( OptimizerUtils.isSparkExecutionMode() ) return SparkExecutionContext.getDefaultParallelism(true); else @@ -243,8 +225,7 @@ public class InfrastructureAnalyzer * * @return maximum memory constraint */ - public static long getCmMax() - { + public static long getCmMax() { //default value (if not specified) return Math.min( getLocalMaxMemory(), getRemoteMaxMemoryMap() ); } @@ -297,28 +278,22 @@ public class InfrastructureAnalyzer return ret; } - public static void setMaxMemoryOpt(JobConf job, String key, long bytes) - { + public static void setMaxMemoryOpt(JobConf job, String key, long bytes) { String javaOptsOld = job.get( key ); String javaOptsNew = null; - - //StringTokenizer st = new StringTokenizer( javaOptsOld, " " ); String[] tokens = javaOptsOld.split(" "); //account also for no ' ' StringBuilder sb = new StringBuilder(); - for( String arg : tokens ) - { - if( arg.startsWith("-Xmx") ) //search for max mem - { + for( String arg : tokens ) { + if( arg.startsWith("-Xmx") ) { //search for max mem sb.append("-Xmx"); sb.append( (bytes/(1024*1024)) ); sb.append("M"); } else sb.append(arg); - sb.append(" "); } - javaOptsNew = sb.toString().trim(); + javaOptsNew = sb.toString().trim(); job.set(key, javaOptsNew); } @@ -343,22 +318,18 @@ public class InfrastructureAnalyzer ClusterStatus stat = client.getClusterStatus(); double ret = 0.0; - if( stat != null ) //if in cluster mode - { - if( mapOnly ) - { + if( stat != null ) { //if in cluster mode + if( mapOnly ) { int capacity = stat.getMaxMapTasks(); int current = stat.getMapTasks(); ret = ((double)current) / capacity; } - else - { + else { int capacity = stat.getMaxMapTasks() + stat.getMaxReduceTasks(); int current = stat.getMapTasks() + stat.getReduceTasks(); ret = ((double)current) / capacity; } } - return ret; } @@ -368,31 +339,21 @@ public class InfrastructureAnalyzer /** * Analyzes properties of local machine and JVM. */ - private static void analyzeLocalMachine() - { + private static void analyzeLocalMachine() { //step 1: basic parallelism and memory _localPar = Runtime.getRuntime().availableProcessors(); _localJVMMaxMem = Runtime.getRuntime().maxMemory(); - - //step 2: analyze if used jdk older than jdk8 - String version = System.getProperty("java.version"); - - //check for jdk version less than 8 (and raise warning if multi-threaded) - _isLtJDK8 = (UtilFunctions.compareVersion(version, "1.8") < 0); } /** * Analyzes properties of hadoop cluster and configuration. */ - private static void analyzeHadoopCluster() - { - try - { + private static void analyzeHadoopCluster() { + try { JobConf job = ConfigurationManager.getCachedJobConf(); JobClient client = new JobClient(job); ClusterStatus stat = client.getClusterStatus(); - if( stat != null ) //if in cluster mode - { + if( stat != null ) { //if in cluster mode //analyze cluster status _remotePar = stat.getTaskTrackers(); _remoteParMap = stat.getMaxMapTasks(); @@ -400,10 +361,9 @@ public class InfrastructureAnalyzer //analyze pure configuration properties analyzeHadoopConfiguration(); - } + } } - catch (IOException e) - { + catch (IOException e) { throw new RuntimeException("Unable to analyze infrastructure.",e); } } @@ -412,12 +372,11 @@ public class InfrastructureAnalyzer * Analyzes only properties of hadoop configuration in order to prevent * expensive call to cluster status . */ - private static void analyzeHadoopConfiguration() - { + private static void analyzeHadoopConfiguration() { JobConf job = ConfigurationManager.getCachedJobConf(); _remoteMRSortMem = (1024*1024) * job.getLong(MRConfigurationNames.MR_TASK_IO_SORT_MB,100); //1MB - + //handle jvm max mem (map mem budget is relevant for map-side distcache and parfor) //(for robustness we probe both: child and map configuration parameters) String javaOpts1 = job.get(MRConfigurationNames.MR_CHILD_JAVA_OPTS); //internally mapred/mapreduce synonym @@ -441,15 +400,13 @@ public class InfrastructureAnalyzer _yarnEnabled = (framework!=null && framework.equals("yarn")); //analyze if local mode (internally requires yarn_enabled) - _localJT = analyzeLocalMode(job); + _localJT = analyzeLocalMode(job); } - private static boolean analyzeLocalMode(JobConf job) - { + private static boolean analyzeLocalMode(JobConf job) { //analyze if local mode (if yarn enabled, we always assume cluster mode //in order to workaround configuration issues on >=Hadoop 2.6) String jobTracker = job.get(MRConfigurationNames.MR_JOBTRACKER_ADDRESS, "local"); - return "local".equals(jobTracker) - & !isYarnEnabled(); + return "local".equals(jobTracker) && !isYarnEnabled(); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/6b877b04/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java index fb80deb..cd8635a 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java @@ -114,9 +114,6 @@ public class CSVReblockSPInstruction extends UnarySPInstruction { return; } - //check jdk version (prevent double.parseDouble contention on <jdk8) - sec.checkAndRaiseValidationWarningJDKVersion(); - //execute matrix/frame csvreblock JavaPairRDD<?,?> out = null; if( input1.getDataType() == DataType.MATRIX ) http://git-wip-us.apache.org/repos/asf/systemml/blob/6b877b04/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java index d529084..609b399 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java @@ -121,18 +121,14 @@ public class ReblockSPInstruction extends UnarySPInstruction { MatrixCharacteristics mc = sec.getMatrixCharacteristics(input1.getName()); MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); - if(iinfo == InputInfo.TextCellInputInfo || iinfo == InputInfo.MatrixMarketInputInfo ) - { - //check jdk version (prevent double.parseDouble contention on <jdk8) - sec.checkAndRaiseValidationWarningJDKVersion(); - + if(iinfo == InputInfo.TextCellInputInfo || iinfo == InputInfo.MatrixMarketInputInfo ) { //get the input textcell rdd - JavaPairRDD<LongWritable, Text> lines = (JavaPairRDD<LongWritable, Text>) - sec.getRDDHandleForVariable(input1.getName(), iinfo); + JavaPairRDD<LongWritable, Text> lines = (JavaPairRDD<LongWritable, Text>) + sec.getRDDHandleForVariable(input1.getName(), iinfo); //convert textcell to binary block - JavaPairRDD<MatrixIndexes, MatrixBlock> out = - RDDConverterUtils.textCellToBinaryBlock(sec.getSparkContext(), lines, mcOut, outputEmptyBlocks); + JavaPairRDD<MatrixIndexes, MatrixBlock> out = + RDDConverterUtils.textCellToBinaryBlock(sec.getSparkContext(), lines, mcOut, outputEmptyBlocks); //put output RDD handle into symbol table sec.setRDDHandleForVariable(output.getName(), out); @@ -200,18 +196,14 @@ public class ReblockSPInstruction extends UnarySPInstruction { FrameObject fo = sec.getFrameObject(input1.getName()); MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); - if(iinfo == InputInfo.TextCellInputInfo ) - { - //check jdk version (prevent double.parseDouble contention on <jdk8) - sec.checkAndRaiseValidationWarningJDKVersion(); - + if(iinfo == InputInfo.TextCellInputInfo ) { //get the input textcell rdd JavaPairRDD<LongWritable, Text> lines = (JavaPairRDD<LongWritable, Text>) - sec.getRDDHandleForVariable(input1.getName(), iinfo); + sec.getRDDHandleForVariable(input1.getName(), iinfo); //convert textcell to binary block JavaPairRDD<Long, FrameBlock> out = - FrameRDDConverterUtils.textCellToBinaryBlock(sec.getSparkContext(), lines, mcOut, fo.getSchema()); + FrameRDDConverterUtils.textCellToBinaryBlock(sec.getSparkContext(), lines, mcOut, fo.getSchema()); //put output RDD handle into symbol table sec.setRDDHandleForVariable(output.getName(), out); @@ -225,8 +217,8 @@ public class ReblockSPInstruction extends UnarySPInstruction { String delim = ","; boolean fill = false; double fillValue = 0; - if(fo.getFileFormatProperties() instanceof CSVFileFormatProperties - && fo.getFileFormatProperties() != null ) + if(fo.getFileFormatProperties() instanceof CSVFileFormatProperties + && fo.getFileFormatProperties() != null ) { CSVFileFormatProperties props = (CSVFileFormatProperties) fo.getFileFormatProperties(); hasHeader = props.hasHeader(); @@ -240,7 +232,7 @@ public class ReblockSPInstruction extends UnarySPInstruction { } else { throw new DMLRuntimeException("The given InputInfo is not implemented " - + "for ReblockSPInstruction: " + InputInfo.inputInfoToString(iinfo)); + + "for ReblockSPInstruction: " + InputInfo.inputInfoToString(iinfo)); } } }
