Repository: systemml
Updated Branches:
  refs/heads/master 6992c3896 -> 6b877b04c


[SYSTEMML-1421] Remove unnecessary checks for Java 7 and below

We dropped the support for Java 6 and 7 a while ago and meanwhile, we
leverage Java 8 features such as lambda expressions expensively, which
means that SystemML will not run with Java 7 or below. Accordingly, this
patch removes unnecessary checks for Java versions < 8 (which had severe
scalability issues for multi-threaded string-double parsing - due to
internal static synchronized caches - as used in text and csv readers).


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/6b877b04
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/6b877b04
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/6b877b04

Branch: refs/heads/master
Commit: 6b877b04cf9386107ced2afbb736b3c39161d9c8
Parents: 6992c38
Author: Matthias Boehm <[email protected]>
Authored: Sat Dec 2 15:44:35 2017 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Sat Dec 2 15:44:35 2017 -0800

----------------------------------------------------------------------
 .../org/apache/sysml/hops/OptimizerUtils.java   |  11 --
 .../context/SparkExecutionContext.java          |  27 -----
 .../parfor/stat/InfrastructureAnalyzer.java     | 103 ++++++-------------
 .../spark/CSVReblockSPInstruction.java          |   3 -
 .../spark/ReblockSPInstruction.java             |  30 ++----
 5 files changed, 41 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/6b877b04/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index 9cc2442..d978500 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -22,8 +22,6 @@ package org.apache.sysml.hops;
 import java.util.Arrays;
 import java.util.HashMap;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.sysml.api.DMLScript;
@@ -62,8 +60,6 @@ import org.apache.sysml.yarn.ropt.YarnClusterAnalyzer;
 
 public class OptimizerUtils 
 {
-       private static final Log LOG = 
LogFactory.getLog(OptimizerUtils.class.getName());
-       
        ////////////////////////////////////////////////////////
        // Optimizer constants and flags (incl tuning knobs)  //
        ////////////////////////////////////////////////////////
@@ -364,13 +360,6 @@ public class OptimizerUtils
                        cconf.set(ConfigType.PARALLEL_CP_READ_BINARYFORMATS, 
false);
                        cconf.set(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS, 
false);
                }
-               else if( InfrastructureAnalyzer.isJavaVersionLessThanJDK8()
-                       && InfrastructureAnalyzer.getLocalParallelism() > 1 )
-               {
-                       LOG.warn("Auto-disable multi-threaded text read for 
'text' and 'csv' due to thread contention on JRE < 1.8"
-                                       + " (java.version="+ 
System.getProperty("java.version")+").");
-                       cconf.set(ConfigType.PARALLEL_CP_READ_TEXTFORMATS, 
false);
-               }
 
                //handle parallel matrix mult / rand configuration
                if (!dmlconf.getBooleanValue(DMLConfig.CP_PARALLEL_OPS)) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/6b877b04/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 84fdd88..00710c3 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -1369,33 +1369,6 @@ public class SparkExecutionContext extends 
ExecutionContext
                        .getDefaultParallelism(refresh);
        }
 
-       public void checkAndRaiseValidationWarningJDKVersion()
-       {
-               //check for jdk version less than jdk8
-               boolean isLtJDK8 = 
InfrastructureAnalyzer.isJavaVersionLessThanJDK8();
-
-               //check multi-threaded executors
-               int numExecutors = getNumExecutors();
-               int numCores = getDefaultParallelism(false);
-               boolean multiThreaded = (numCores > numExecutors);
-
-               //check for jdk version less than 8 (and raise warning if 
multi-threaded)
-               if( isLtJDK8 && multiThreaded)
-               {
-                       //get the jre version
-                       String version = System.getProperty("java.version");
-
-                       
LOG.warn("########################################################################################");
-                       LOG.warn("### WARNING: Multi-threaded text reblock may 
lead to thread contention on JRE < 1.8 ####");
-                       LOG.warn("### java.version = " + version);
-                       LOG.warn("### total number of executors = " + 
numExecutors);
-                       LOG.warn("### total number of cores = " + numCores);
-                       LOG.warn("### JDK-7032154: Performance tuning of 
sun.misc.FloatingDecimal/FormattedFloatingDecimal");
-                       LOG.warn("### Workaround: Convert text to binary w/ 
changed configuration of one executor per core");
-                       
LOG.warn("########################################################################################");
-               }
-       }
-
        /**
         * Captures relevant spark cluster configuration properties, e.g., 
memory budgets and
         * degree of parallelism. This configuration abstracts legacy (< Spark 
1.6) and current

http://git-wip-us.apache.org/repos/asf/systemml/blob/6b877b04/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
index 1f1fce2..656089f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
@@ -40,13 +40,11 @@ import org.apache.sysml.utils.lite.LiteCheck;
  */
 public class InfrastructureAnalyzer 
 {
-       
        public static final long DEFAULT_JVM_SIZE = 512 * 1024 * 1024;
        
        //static local master node properties
        private static int  _localPar        = -1;
        private static long _localJVMMaxMem  = -1;
-       private static boolean _isLtJDK8 = false;
        
        //static hadoop cluster properties
        private static int  _remotePar       = -1;
@@ -60,8 +58,7 @@ public class InfrastructureAnalyzer
        private static boolean _yarnEnabled  = false;
        
        //static initialization, called for each JVM (on each node)
-       static 
-       {
+       static {
                //analyze local node properties
                analyzeLocalMachine();
                
@@ -69,11 +66,6 @@ public class InfrastructureAnalyzer
                //analyzeHadoopCluster(); //note: due to overhead - analyze 
on-demand
        }
 
-       public static boolean isJavaVersionLessThanJDK8()
-       {
-               return _isLtJDK8;
-       }
-       
        ///////
        //methods for obtaining parallelism properties
        
@@ -83,8 +75,7 @@ public class InfrastructureAnalyzer
         * 
         * @return number of local processors of the current node
         */
-       public static int getLocalParallelism()
-       {
+       public static int getLocalParallelism() {
                return _localPar;
        }       
        
@@ -111,8 +102,7 @@ public class InfrastructureAnalyzer
                return _remoteParMap;
        }
 
-       public static void setRemoteParallelMapTasks(int pmap)
-       {
+       public static void setRemoteParallelMapTasks(int pmap) {
                _remoteParMap = pmap;
        }
        
@@ -127,8 +117,7 @@ public class InfrastructureAnalyzer
                return _remoteParReduce;
        }
 
-       public static void setRemoteParallelReduceTasks(int preduce)
-       {
+       public static void setRemoteParallelReduceTasks(int preduce) {
                _remoteParReduce = preduce;
        }
        
@@ -140,13 +129,11 @@ public class InfrastructureAnalyzer
         * 
         * @return maximum memory of the current JVM
         */
-       public static long getLocalMaxMemory()
-       {
+       public static long getLocalMaxMemory() {
                return _localJVMMaxMem;
        }
 
-       public static void setLocalMaxMemory( long localMem )
-       {
+       public static void setLocalMaxMemory( long localMem ) {
                _localJVMMaxMem = localMem;
        }
        
@@ -161,8 +148,7 @@ public class InfrastructureAnalyzer
                return _remoteJVMMaxMemMap;
        }
 
-       public static void setRemoteMaxMemoryMap( long remoteMem )
-       {
+       public static void setRemoteMaxMemoryMap( long remoteMem ) {
                _remoteJVMMaxMemMap = remoteMem;
        }
        
@@ -177,8 +163,7 @@ public class InfrastructureAnalyzer
                return _remoteJVMMaxMemReduce;
        }
 
-       public static void setRemoteMaxMemoryReduce( long remoteMem )
-       {
+       public static void setRemoteMaxMemoryReduce( long remoteMem ) {
                _remoteJVMMaxMemReduce = remoteMem;
        }
 
@@ -199,8 +184,7 @@ public class InfrastructureAnalyzer
                return _localJT;
        }
 
-       public static boolean isLocalMode(JobConf job)
-       {
+       public static boolean isLocalMode(JobConf job) {
                // Due to a bug in HDP related to fetching the "mode" of 
execution within mappers,
                // we explicitly probe the relevant properties instead of 
relying on results from 
                // analyzeHadoopCluster().
@@ -219,8 +203,7 @@ public class InfrastructureAnalyzer
         * 
         * @return maximum local parallelism constraint
         */
-       public static int getCkMaxCP() 
-       {
+       public static int getCkMaxCP() {
                //default value (if not specified)
                return getLocalParallelism();
        }
@@ -230,8 +213,7 @@ public class InfrastructureAnalyzer
         * 
         * @return maximum remote parallelism constraint
         */
-       public static int getCkMaxMR() 
-       {
+       public static int getCkMaxMR() {
                if( OptimizerUtils.isSparkExecutionMode() )
                        return 
SparkExecutionContext.getDefaultParallelism(true);
                else
@@ -243,8 +225,7 @@ public class InfrastructureAnalyzer
         * 
         * @return maximum memory constraint
         */
-       public static long getCmMax() 
-       {
+       public static long getCmMax() {
                //default value (if not specified)
                return Math.min( getLocalMaxMemory(), getRemoteMaxMemoryMap() );
        }
@@ -297,28 +278,22 @@ public class InfrastructureAnalyzer
                return ret;
        }
 
-       public static void setMaxMemoryOpt(JobConf job, String key, long bytes)
-       {
+       public static void setMaxMemoryOpt(JobConf job, String key, long bytes) 
{
                String javaOptsOld = job.get( key );
                String javaOptsNew = null;
-
-               //StringTokenizer st = new StringTokenizer( javaOptsOld, " " );
                String[] tokens = javaOptsOld.split(" "); //account also for no 
' '
                StringBuilder sb = new StringBuilder();
-               for( String arg : tokens )
-               {
-                       if( arg.startsWith("-Xmx") ) //search for max mem
-                       {
+               for( String arg : tokens ) {
+                       if( arg.startsWith("-Xmx") ) { //search for max mem
                                sb.append("-Xmx");
                                sb.append( (bytes/(1024*1024)) );
                                sb.append("M");
                        }
                        else
                                sb.append(arg);
-                       
                        sb.append(" ");
                }
-               javaOptsNew = sb.toString().trim();             
+               javaOptsNew = sb.toString().trim();
                job.set(key, javaOptsNew);
        }
        
@@ -343,22 +318,18 @@ public class InfrastructureAnalyzer
                ClusterStatus stat = client.getClusterStatus();
                
                double ret = 0.0;
-               if( stat != null ) //if in cluster mode
-               {       
-                       if( mapOnly )
-                       {
+               if( stat != null ) { //if in cluster mode
+                       if( mapOnly ) {
                                int capacity = stat.getMaxMapTasks();
                                int current = stat.getMapTasks();
                                ret = ((double)current) / capacity;
                        }
-                       else
-                       {
+                       else {
                                int capacity = stat.getMaxMapTasks() + 
stat.getMaxReduceTasks();
                                int current = stat.getMapTasks() + 
stat.getReduceTasks();
                                ret = ((double)current) / capacity;
                        }
                }
-               
                return ret;
        }
        
@@ -368,31 +339,21 @@ public class InfrastructureAnalyzer
        /**
         * Analyzes properties of local machine and JVM.
         */
-       private static void analyzeLocalMachine()
-       {
+       private static void analyzeLocalMachine() {
                //step 1: basic parallelism and memory
                _localPar       = Runtime.getRuntime().availableProcessors();
                _localJVMMaxMem = Runtime.getRuntime().maxMemory();
-               
-               //step 2: analyze if used jdk older than jdk8
-               String version = System.getProperty("java.version");
-               
-               //check for jdk version less than 8 (and raise warning if 
multi-threaded)
-               _isLtJDK8 = (UtilFunctions.compareVersion(version, "1.8") < 0); 
        }
        
        /**
         * Analyzes properties of hadoop cluster and configuration.
         */
-       private static void analyzeHadoopCluster()
-       {
-               try 
-               {
+       private static void analyzeHadoopCluster() {
+               try {
                        JobConf job = ConfigurationManager.getCachedJobConf();
                        JobClient client = new JobClient(job);
                        ClusterStatus stat = client.getClusterStatus();
-                       if( stat != null ) //if in cluster mode
-                       {
+                       if( stat != null ) { //if in cluster mode
                                //analyze cluster status
                                _remotePar = stat.getTaskTrackers();
                                _remoteParMap = stat.getMaxMapTasks(); 
@@ -400,10 +361,9 @@ public class InfrastructureAnalyzer
                                
                                //analyze pure configuration properties
                                analyzeHadoopConfiguration();
-                       }               
+                       }
                } 
-               catch (IOException e) 
-               {
+               catch (IOException e) {
                        throw new RuntimeException("Unable to analyze 
infrastructure.",e);
                }
        }
@@ -412,12 +372,11 @@ public class InfrastructureAnalyzer
         * Analyzes only properties of hadoop configuration in order to prevent 
         * expensive call to cluster status .
         */
-       private static void analyzeHadoopConfiguration()
-       {
+       private static void analyzeHadoopConfiguration() {
                JobConf job = ConfigurationManager.getCachedJobConf();
                
                _remoteMRSortMem = (1024*1024) * 
job.getLong(MRConfigurationNames.MR_TASK_IO_SORT_MB,100); //1MB
-                       
+               
                //handle jvm max mem (map mem budget is relevant for map-side 
distcache and parfor)
                //(for robustness we probe both: child and map configuration 
parameters)
                String javaOpts1 = 
job.get(MRConfigurationNames.MR_CHILD_JAVA_OPTS); //internally mapred/mapreduce 
synonym
@@ -441,15 +400,13 @@ public class InfrastructureAnalyzer
                _yarnEnabled = (framework!=null && framework.equals("yarn"));
                
                //analyze if local mode (internally requires yarn_enabled)
-               _localJT = analyzeLocalMode(job);               
+               _localJT = analyzeLocalMode(job);
        }
 
-       private static boolean analyzeLocalMode(JobConf job)
-       {
+       private static boolean analyzeLocalMode(JobConf job) {
                //analyze if local mode (if yarn enabled, we always assume 
cluster mode
                //in order to workaround configuration issues on >=Hadoop 2.6)
                String jobTracker = 
job.get(MRConfigurationNames.MR_JOBTRACKER_ADDRESS, "local");
-               return "local".equals(jobTracker)
-                          & !isYarnEnabled();
+               return "local".equals(jobTracker) && !isYarnEnabled();
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/6b877b04/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
index fb80deb..cd8635a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
@@ -114,9 +114,6 @@ public class CSVReblockSPInstruction extends 
UnarySPInstruction {
                        return;
                }
                
-               //check jdk version (prevent double.parseDouble contention on 
<jdk8)
-               sec.checkAndRaiseValidationWarningJDKVersion();
-               
                //execute matrix/frame csvreblock 
                JavaPairRDD<?,?> out = null;
                if( input1.getDataType() == DataType.MATRIX )

http://git-wip-us.apache.org/repos/asf/systemml/blob/6b877b04/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
index d529084..609b399 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
@@ -121,18 +121,14 @@ public class ReblockSPInstruction extends 
UnarySPInstruction {
                MatrixCharacteristics mc = 
sec.getMatrixCharacteristics(input1.getName());
                MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
                
-               if(iinfo == InputInfo.TextCellInputInfo || iinfo == 
InputInfo.MatrixMarketInputInfo ) 
-               {
-                       //check jdk version (prevent double.parseDouble 
contention on <jdk8)
-                       sec.checkAndRaiseValidationWarningJDKVersion();
-                       
+               if(iinfo == InputInfo.TextCellInputInfo || iinfo == 
InputInfo.MatrixMarketInputInfo ) {
                        //get the input textcell rdd
-                       JavaPairRDD<LongWritable, Text> lines = 
(JavaPairRDD<LongWritable, Text>) 
-                                       
sec.getRDDHandleForVariable(input1.getName(), iinfo);
+                       JavaPairRDD<LongWritable, Text> lines = 
(JavaPairRDD<LongWritable, Text>)
+                               sec.getRDDHandleForVariable(input1.getName(), 
iinfo);
                        
                        //convert textcell to binary block
-                       JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
-                                       
RDDConverterUtils.textCellToBinaryBlock(sec.getSparkContext(), lines, mcOut, 
outputEmptyBlocks);
+                       JavaPairRDD<MatrixIndexes, MatrixBlock> out =
+                               
RDDConverterUtils.textCellToBinaryBlock(sec.getSparkContext(), lines, mcOut, 
outputEmptyBlocks);
                        
                        //put output RDD handle into symbol table
                        sec.setRDDHandleForVariable(output.getName(), out);
@@ -200,18 +196,14 @@ public class ReblockSPInstruction extends 
UnarySPInstruction {
                FrameObject fo = sec.getFrameObject(input1.getName());
                MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
                
-               if(iinfo == InputInfo.TextCellInputInfo ) 
-               {
-                       //check jdk version (prevent double.parseDouble 
contention on <jdk8)
-                       sec.checkAndRaiseValidationWarningJDKVersion();
-                       
+               if(iinfo == InputInfo.TextCellInputInfo ) {
                        //get the input textcell rdd
                        JavaPairRDD<LongWritable, Text> lines = 
(JavaPairRDD<LongWritable, Text>) 
-                                       
sec.getRDDHandleForVariable(input1.getName(), iinfo);
+                               sec.getRDDHandleForVariable(input1.getName(), 
iinfo);
                        
                        //convert textcell to binary block
                        JavaPairRDD<Long, FrameBlock> out = 
-                                       
FrameRDDConverterUtils.textCellToBinaryBlock(sec.getSparkContext(), lines, 
mcOut, fo.getSchema());
+                               
FrameRDDConverterUtils.textCellToBinaryBlock(sec.getSparkContext(), lines, 
mcOut, fo.getSchema());
                        
                        //put output RDD handle into symbol table
                        sec.setRDDHandleForVariable(output.getName(), out);
@@ -225,8 +217,8 @@ public class ReblockSPInstruction extends 
UnarySPInstruction {
                        String delim = ",";
                        boolean fill = false;
                        double fillValue = 0;
-                       if(fo.getFileFormatProperties() instanceof 
CSVFileFormatProperties 
-                          && fo.getFileFormatProperties() != null ) 
+                       if(fo.getFileFormatProperties() instanceof 
CSVFileFormatProperties
+                               && fo.getFileFormatProperties() != null ) 
                        {
                                CSVFileFormatProperties props = 
(CSVFileFormatProperties) fo.getFileFormatProperties();
                                hasHeader = props.hasHeader();
@@ -240,7 +232,7 @@ public class ReblockSPInstruction extends 
UnarySPInstruction {
                }
                else {
                        throw new DMLRuntimeException("The given InputInfo is 
not implemented "
-                                       + "for ReblockSPInstruction: " + 
InputInfo.inputInfoToString(iinfo));
+                               + "for ReblockSPInstruction: " + 
InputInfo.inputInfoToString(iinfo));
                }
        }
 }

Reply via email to