Repository: systemml
Updated Branches:
  refs/heads/master 1cc9a5e9a -> 127cc06d9


[SYSTEMML-2059] Improved parfor optimize and initialize overhead

This patch improves the parfor optimization and initialization overhead
by (1) removed unnecessary inefficiencies from the creation of parfor
optimizer trees, and (2) parallel creation of local parfor workers,
which also creates deep copies for update-in-place variables.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/7cb95667
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/7cb95667
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/7cb95667

Branch: refs/heads/master
Commit: 7cb956676d6e5f1b0ef5c3095c241149df18c1d7
Parents: 1cc9a5e
Author: Matthias Boehm <[email protected]>
Authored: Fri Jan 5 16:28:58 2018 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Fri Jan 5 16:28:58 2018 -0800

----------------------------------------------------------------------
 .../controlprogram/ParForProgramBlock.java      | 16 +++--
 .../controlprogram/parfor/opt/OptNode.java      | 22 ++-----
 .../parfor/opt/OptTreeConverter.java            | 65 +++++---------------
 .../parfor/opt/OptTreePlanMapping.java          |  2 +-
 .../parfor/opt/OptimizationWrapper.java         |  2 +-
 .../runtime/instructions/InstructionParser.java | 64 ++++++++++---------
 .../runtime/instructions/InstructionUtils.java  |  9 ++-
 7 files changed, 70 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/7cb95667/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index d534fe0..a0c3ef1 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -29,6 +29,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.stream.IntStream;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -750,17 +751,16 @@ public class ParForProgramBlock extends ForProgramBlock
                
                try
                {
-                       // Step 1) init parallel workers, task queue and threads
+                       // Step 1) create task queue and init workers in 
parallel
+                       // (including preparation of update-in-place variables)
                        LocalTaskQueue<Task> queue = new LocalTaskQueue<>();
                        Thread[] threads         = new Thread[_numThreads];
                        LocalParWorker[] workers = new 
LocalParWorker[_numThreads];
-                       for( int i=0; i<_numThreads; i++ ) {
-                               //create parallel workers as (lazy) deep copies
-                               //including preparation of update-in-place 
variables
+                       IntStream.range(0, _numThreads).parallel().forEach(i -> 
{
                                workers[i] = createParallelWorker( _pwIDs[i], 
queue, ec, i);
                                threads[i] = new Thread( workers[i] );
-                               threads[i].setPriority(Thread.MAX_PRIORITY); 
-                       }
+                               threads[i].setPriority(Thread.MAX_PRIORITY);
+                       });
                        
                        // start threads (from now on waiting for tasks)
                        for( Thread thread : threads )
@@ -1346,10 +1346,8 @@ public class ParForProgramBlock extends ForProgramBlock
         * @param ec execution context
         * @param index the index of the worker
         * @return local parworker
-        * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
        private LocalParWorker createParallelWorker(long pwID, 
LocalTaskQueue<Task> queue, ExecutionContext ec, int index)
-               throws DMLRuntimeException
        {
                LocalParWorker pw = null; 
                
@@ -1393,7 +1391,7 @@ public class ParForProgramBlock extends ForProgramBlock
                        pw.setFunctionNames(fnNames);
                }
                catch(Exception ex) {
-                       throw new DMLRuntimeException(ex);
+                       throw new RuntimeException(ex);
                }
                
                return pw;

http://git-wip-us.apache.org/repos/asf/systemml/blob/7cb95667/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
index 1707d3e..eb7ac0c 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
@@ -22,6 +22,7 @@ package org.apache.sysml.runtime.controlprogram.parfor.opt;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Set;
 
 import org.apache.commons.lang.ArrayUtils;
@@ -92,9 +93,6 @@ public class OptNode
        private int                       _k       = -1;
        private HashMap<ParamType,String> _params  = null;
        
-       //node statistics (only present for physical plans and leaf nodes)
-       private OptNodeStatistics         _stats   = null;
-       
        //line numbers (for explain)
        private int                       _beginLine = -1;
        private int                       _endLine = -1;
@@ -203,14 +201,6 @@ public class OptNode
        public void setK(int k) {
                _k = k;
        }
-       
-       public OptNodeStatistics getStatistics() {
-               return _stats;
-       }
-       
-       public void setStatistics(OptNodeStatistics stats) {
-               _stats = stats;
-       }
 
        public boolean exchangeChild(OptNode oldNode, OptNode newNode) {
                if( isLeaf() )
@@ -334,7 +324,7 @@ public class OptNode
        }
 
        public int getTotalK() {
-               int k = 1;              
+               int k = 1;
                if( !isLeaf() )
                        for( OptNode n : _childs )
                                k = Math.max(k, n.getTotalK() );
@@ -404,13 +394,13 @@ public class OptNode
        public void checkAndCleanupLeafNodes() {
                if( isLeaf() )
                        return;
-               for( int i=0; i<_childs.size(); i++ ) {
-                       OptNode n = _childs.get(i);
+               Iterator<OptNode> iter = _childs.iterator();
+               while( iter.hasNext() ) {
+                       OptNode n = iter.next();
                        n.checkAndCleanupLeafNodes();
                        if( n.isLeaf() && n._ntype != NodeType.HOP && n._ntype 
!= NodeType.INST 
                                && n._ntype != NodeType.FUNCCALL ) {
-                               _childs.remove(i);
-                               i--;
+                               iter.remove();
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/7cb95667/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java
index 69ad37a..65f7299 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java
@@ -286,10 +286,6 @@ public class OptTreeConverter
                                throw new DMLRuntimeException("Unsupported 
instruction type.");
                }
                
-               //create statistics 
-               OptNodeStatistics stats = analyzeStatistics(inst, node, vars);
-               node.setStatistics(stats);
-               
                return node;
        }
 
@@ -465,9 +461,10 @@ public class OptTreeConverter
                        
                        //TODO remove this workaround once this information can 
be obtained from hops/lops compiler
                        if( node.isCPOnly() ) {
-                               if( containsMRJobInstruction(pb, false, false) )
+                               boolean isSparkExec = 
OptimizerUtils.isSparkExecutionMode();
+                               if( !isSparkExec && 
containsMRJobInstruction(pb, false, false) )
                                        node.setExecType(ExecType.MR);
-                               else if(containsMRJobInstruction(pb, false, 
true))
+                               else if( isSparkExec && 
containsMRJobInstruction(pb, false, true))
                                        node.setExecType(ExecType.SPARK);
                        }
                }
@@ -622,7 +619,7 @@ public class OptTreeConverter
                                ret = rContainsMRJobInstruction(pb2, 
inclFunctions);
                                if( ret ) return ret;
                        }
-               }               
+               }
                else if (  pb instanceof FunctionProgramBlock ) //includes 
ExternalFunctionProgramBlock and ExternalFunctionProgramBlockCP)
                {
                        //do nothing
@@ -630,53 +627,25 @@ public class OptTreeConverter
                else 
                {
                        ret =   containsMRJobInstruction(pb, true, true)
-                             | (inclFunctions && 
containsFunctionCallInstruction(pb));
+                               || (inclFunctions && 
containsFunctionCallInstruction(pb));
                }
 
                return ret;
        }
 
-       public static boolean containsMRJobInstruction( ProgramBlock pb, 
boolean inclCPFile, boolean inclSpark )
-       {
+       public static boolean containsMRJobInstruction( ProgramBlock pb, 
boolean inclCPFile, boolean inclSpark ) {
                return containsMRJobInstruction(pb.getInstructions(), 
inclCPFile, inclSpark);
        }
 
-       public static boolean containsMRJobInstruction( ArrayList<Instruction> 
instSet, boolean inclCPFile, boolean inclSpark )
-       {
-               boolean ret = false;
-               if( instSet!=null )
-                       for( Instruction inst : instSet )
-                               if(    inst instanceof MRJobInstruction
-                                       || (inclSpark && inst instanceof 
SPInstruction) 
-                                       || (inclCPFile && (inst instanceof 
MatrixIndexingCPFileInstruction || inst instanceof 
ParameterizedBuiltinCPFileInstruction)))
-                               {
-                                       ret = true;
-                                       break;
-                               }
-
-               return ret;
+       public static boolean containsMRJobInstruction( ArrayList<Instruction> 
instSet, boolean inclCPFile, boolean inclSpark ) {
+               return instSet.stream().anyMatch(inst -> inst instanceof 
MRJobInstruction
+                       || (inclSpark && inst instanceof SPInstruction)
+                       || (inclCPFile && (inst instanceof 
MatrixIndexingCPFileInstruction || inst instanceof 
ParameterizedBuiltinCPFileInstruction)));
        }
 
-       public static boolean containsFunctionCallInstruction( ProgramBlock pb )
-       {
-               boolean ret = false;
-               for( Instruction inst : pb.getInstructions() )
-                       if( inst instanceof FunctionCallCPInstruction )
-                       {
-                               ret = true;
-                               break;
-                       }
-
-               return ret;
-       }       
-
-       private static OptNodeStatistics analyzeStatistics(Instruction inst, 
OptNode on, LocalVariableMap vars) 
-               throws DMLRuntimeException 
-       {
-               //since the performance test tool for offline profiling has 
been removed,
-               //we return default values
-               
-               return new OptNodeStatistics(); //default values
+       public static boolean containsFunctionCallInstruction( ProgramBlock pb 
) {
+               return pb.getInstructions().stream()
+                       .anyMatch(inst -> inst instanceof 
FunctionCallCPInstruction);
        }
 
        public static void replaceProgramBlock(OptNode parent, OptNode n, 
ProgramBlock pbOld, ProgramBlock pbNew, boolean rtMap) 
@@ -700,22 +669,22 @@ public class OptTreeConverter
                {
                        IfProgramBlock ipb = (IfProgramBlock) pbParent;
                        replaceProgramBlock( ipb.getChildBlocksIfBody(), pbOld, 
pbNew );
-                       replaceProgramBlock( ipb.getChildBlocksElseBody(), 
pbOld, pbNew );                              
+                       replaceProgramBlock( ipb.getChildBlocksElseBody(), 
pbOld, pbNew );
                }
                else if( pbParent instanceof WhileProgramBlock )
                {
                        WhileProgramBlock wpb = (WhileProgramBlock) pbParent;
-                       replaceProgramBlock( wpb.getChildBlocks(), pbOld, pbNew 
);                      
+                       replaceProgramBlock( wpb.getChildBlocks(), pbOld, pbNew 
);
                }
                else if( pbParent instanceof ForProgramBlock || pbParent 
instanceof ParForProgramBlock )
                {
                        ForProgramBlock fpb = (ForProgramBlock) pbParent;
-                       replaceProgramBlock( fpb.getChildBlocks(), pbOld, pbNew 
);      
+                       replaceProgramBlock( fpb.getChildBlocks(), pbOld, pbNew 
);
                }
                else if( pbParent instanceof FunctionProgramBlock )
                {
                        FunctionProgramBlock fpb = (FunctionProgramBlock) 
pbParent;
-                       replaceProgramBlock( fpb.getChildBlocks(), pbOld, pbNew 
);      
+                       replaceProgramBlock( fpb.getChildBlocks(), pbOld, pbNew 
);
                }
                else
                        throw new DMLRuntimeException("Optimizer doesn't 
support "+pbParent.getClass().getName());

http://git-wip-us.apache.org/repos/asf/systemml/blob/7cb95667/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreePlanMapping.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreePlanMapping.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreePlanMapping.java
index 4f9bd69..b44579f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreePlanMapping.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreePlanMapping.java
@@ -34,7 +34,7 @@ public class OptTreePlanMapping
        
        protected IDSequence _idSeq;
        protected Map<Long, OptNode> _id_optnode;
-    
+           
        public OptTreePlanMapping()
        {
                _idSeq = new IDSequence();

http://git-wip-us.apache.org/repos/asf/systemml/blob/7cb95667/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizationWrapper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizationWrapper.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizationWrapper.java
index d7f6150..c77d4d0 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizationWrapper.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizationWrapper.java
@@ -111,7 +111,7 @@ public class OptimizationWrapper
                
                //set max contraints if not specified
                int ck = UtilFunctions.toInt( Math.max( 
InfrastructureAnalyzer.getCkMaxCP(),
-                                                                       
InfrastructureAnalyzer.getCkMaxMR() ) * PAR_FACTOR_INFRASTRUCTURE );
+                       InfrastructureAnalyzer.getCkMaxMR() ) * 
PAR_FACTOR_INFRASTRUCTURE );
                double cm = InfrastructureAnalyzer.getCmMax() * 
OptimizerUtils.MEM_UTIL_FACTOR; 
                
                //execute optimizer

http://git-wip-us.apache.org/repos/asf/systemml/blob/7cb95667/src/main/java/org/apache/sysml/runtime/instructions/InstructionParser.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/InstructionParser.java 
b/src/main/java/org/apache/sysml/runtime/instructions/InstructionParser.java
index 7a8ef75..e483100 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/InstructionParser.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/InstructionParser.java
@@ -28,44 +28,42 @@ import 
org.apache.sysml.runtime.instructions.spark.SPInstruction.SPType;
 
 
 public class InstructionParser 
-{              
+{
        public static Instruction parseSingleInstruction ( String str ) 
                throws DMLRuntimeException 
-       {       
+       {
                if ( str == null || str.isEmpty() )
                        return null;
                
-               String execType = str.split(Instruction.OPERAND_DELIM)[0]; 
-               if (   execType.equalsIgnoreCase(ExecType.CP.toString()) 
-                       || 
execType.equalsIgnoreCase(ExecType.CP_FILE.toString()) ) 
-               {
-                       CPType cptype = InstructionUtils.getCPType(str); 
-                       if( cptype == null )
-                               throw new DMLRuntimeException("Unknown CP 
instruction: " + str);
-                       return CPInstructionParser.parseSingleInstruction 
(cptype, str);
-               }
-               else if ( execType.equalsIgnoreCase(ExecType.SPARK.toString()) 
) 
-               {
-                       SPType sptype = InstructionUtils.getSPType(str); 
-                       if( sptype == null )
-                               throw new DMLRuntimeException("Unknown SPARK 
instruction: " + str);
-                       return SPInstructionParser.parseSingleInstruction 
(sptype, str);
-               }
-               else if ( execType.equalsIgnoreCase(ExecType.GPU.toString()) ) 
-               {
-                       GPUINSTRUCTION_TYPE gputype = 
InstructionUtils.getGPUType(str); 
-                       if( gputype == null )
-                               throw new DMLRuntimeException("Unknown GPU 
instruction: " + str);
-                       return GPUInstructionParser.parseSingleInstruction 
(gputype, str);
-               }
-               else if ( execType.equalsIgnoreCase("MR") ) {
-                       MRType mrtype = InstructionUtils.getMRType(str); 
-                       if( mrtype == null )
-                               throw new DMLRuntimeException("Unknown MR 
instruction: " + str);
-                       return MRInstructionParser.parseSingleInstruction 
(mrtype, str);
-               }
-               else {
-                       throw new DMLRuntimeException("Unknown execution type 
in instruction: " + str);
+               ExecType et = InstructionUtils.getExecType(str);
+               switch( et ) {
+                       case CP:
+                       case CP_FILE: {
+                               CPType cptype = InstructionUtils.getCPType(str);
+                               if( cptype == null )
+                                       throw new DMLRuntimeException("Unknown 
CP instruction: " + str);
+                               return 
CPInstructionParser.parseSingleInstruction (cptype, str);
+                       }
+                       case SPARK: {
+                               SPType sptype = InstructionUtils.getSPType(str);
+                               if( sptype == null )
+                                       throw new DMLRuntimeException("Unknown 
SPARK instruction: " + str);
+                               return 
SPInstructionParser.parseSingleInstruction (sptype, str);
+                       }
+                       case GPU: {
+                               GPUINSTRUCTION_TYPE gputype = 
InstructionUtils.getGPUType(str);
+                               if( gputype == null )
+                                       throw new DMLRuntimeException("Unknown 
GPU instruction: " + str);
+                               return 
GPUInstructionParser.parseSingleInstruction (gputype, str);
+                       }
+                       case MR: {
+                               MRType mrtype = InstructionUtils.getMRType(str);
+                               if( mrtype == null )
+                                       throw new DMLRuntimeException("Unknown 
MR instruction: " + str);
+                               return 
MRInstructionParser.parseSingleInstruction (mrtype, str);
+                       }
+                       default:
+                               throw new DMLRuntimeException("Unknown 
execution type in instruction: " + str);
                }
        }
        

http://git-wip-us.apache.org/repos/asf/systemml/blob/7cb95667/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java 
b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
index 5e829ee..c75a20b 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
@@ -24,6 +24,7 @@ import java.util.StringTokenizer;
 import org.apache.sysml.lops.AppendM;
 import org.apache.sysml.lops.BinaryM;
 import org.apache.sysml.lops.GroupedAggregateM;
+import org.apache.sysml.lops.LopProperties.ExecType;
 import org.apache.sysml.lops.MapMult;
 import org.apache.sysml.lops.MapMultChain;
 import org.apache.sysml.lops.PMMJ;
@@ -191,9 +192,13 @@ public class InstructionUtils
                
                return ret;
        }
+       
+       public static ExecType getExecType( String str ) {
+               int ix = str.indexOf(Instruction.OPERAND_DELIM);
+               return ExecType.valueOf(str.substring(0, ix));
+       }
 
-       public static String getOpCode( String str ) 
-       {
+       public static String getOpCode( String str ) {
                int ix1 = str.indexOf(Instruction.OPERAND_DELIM);
                int ix2 = str.indexOf(Instruction.OPERAND_DELIM, ix1+1);
                return str.substring(ix1+1, ix2);

Reply via email to