Repository: systemml Updated Branches: refs/heads/master 1cc9a5e9a -> 127cc06d9
[SYSTEMML-2059] Improved parfor optimize and initialize overhead This patch improves the parfor optimization and initialization overhead by (1) removed unnecessary inefficiencies from the creation of parfor optimizer trees, and (2) parallel creation of local parfor workers, which also creates deep copies for update-in-place variables. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/7cb95667 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/7cb95667 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/7cb95667 Branch: refs/heads/master Commit: 7cb956676d6e5f1b0ef5c3095c241149df18c1d7 Parents: 1cc9a5e Author: Matthias Boehm <[email protected]> Authored: Fri Jan 5 16:28:58 2018 -0800 Committer: Matthias Boehm <[email protected]> Committed: Fri Jan 5 16:28:58 2018 -0800 ---------------------------------------------------------------------- .../controlprogram/ParForProgramBlock.java | 16 +++-- .../controlprogram/parfor/opt/OptNode.java | 22 ++----- .../parfor/opt/OptTreeConverter.java | 65 +++++--------------- .../parfor/opt/OptTreePlanMapping.java | 2 +- .../parfor/opt/OptimizationWrapper.java | 2 +- .../runtime/instructions/InstructionParser.java | 64 ++++++++++--------- .../runtime/instructions/InstructionUtils.java | 9 ++- 7 files changed, 70 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/7cb95667/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java index d534fe0..a0c3ef1 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java @@ -29,6 +29,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.stream.IntStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -750,17 +751,16 @@ public class ParForProgramBlock extends ForProgramBlock try { - // Step 1) init parallel workers, task queue and threads + // Step 1) create task queue and init workers in parallel + // (including preparation of update-in-place variables) LocalTaskQueue<Task> queue = new LocalTaskQueue<>(); Thread[] threads = new Thread[_numThreads]; LocalParWorker[] workers = new LocalParWorker[_numThreads]; - for( int i=0; i<_numThreads; i++ ) { - //create parallel workers as (lazy) deep copies - //including preparation of update-in-place variables + IntStream.range(0, _numThreads).parallel().forEach(i -> { workers[i] = createParallelWorker( _pwIDs[i], queue, ec, i); threads[i] = new Thread( workers[i] ); - threads[i].setPriority(Thread.MAX_PRIORITY); - } + threads[i].setPriority(Thread.MAX_PRIORITY); + }); // start threads (from now on waiting for tasks) for( Thread thread : threads ) @@ -1346,10 +1346,8 @@ public class ParForProgramBlock extends ForProgramBlock * @param ec execution context * @param index the index of the worker * @return local parworker - * @throws DMLRuntimeException if DMLRuntimeException occurs */ private LocalParWorker createParallelWorker(long pwID, LocalTaskQueue<Task> queue, ExecutionContext ec, int index) - throws DMLRuntimeException { LocalParWorker pw = null; @@ -1393,7 +1391,7 @@ public class ParForProgramBlock extends ForProgramBlock pw.setFunctionNames(fnNames); } catch(Exception ex) { - throw new DMLRuntimeException(ex); + throw new RuntimeException(ex); } return pw; http://git-wip-us.apache.org/repos/asf/systemml/blob/7cb95667/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java index 1707d3e..eb7ac0c 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java @@ -22,6 +22,7 @@ package org.apache.sysml.runtime.controlprogram.parfor.opt; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.Set; import org.apache.commons.lang.ArrayUtils; @@ -92,9 +93,6 @@ public class OptNode private int _k = -1; private HashMap<ParamType,String> _params = null; - //node statistics (only present for physical plans and leaf nodes) - private OptNodeStatistics _stats = null; - //line numbers (for explain) private int _beginLine = -1; private int _endLine = -1; @@ -203,14 +201,6 @@ public class OptNode public void setK(int k) { _k = k; } - - public OptNodeStatistics getStatistics() { - return _stats; - } - - public void setStatistics(OptNodeStatistics stats) { - _stats = stats; - } public boolean exchangeChild(OptNode oldNode, OptNode newNode) { if( isLeaf() ) @@ -334,7 +324,7 @@ public class OptNode } public int getTotalK() { - int k = 1; + int k = 1; if( !isLeaf() ) for( OptNode n : _childs ) k = Math.max(k, n.getTotalK() ); @@ -404,13 +394,13 @@ public class OptNode public void checkAndCleanupLeafNodes() { if( isLeaf() ) return; - for( int i=0; i<_childs.size(); i++ ) { - OptNode n = _childs.get(i); + Iterator<OptNode> iter = _childs.iterator(); + while( iter.hasNext() ) { + OptNode n = iter.next(); n.checkAndCleanupLeafNodes(); if( n.isLeaf() && n._ntype != NodeType.HOP && n._ntype != NodeType.INST && n._ntype != NodeType.FUNCCALL ) { - _childs.remove(i); - i--; + iter.remove(); } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/7cb95667/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java index 69ad37a..65f7299 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java @@ -286,10 +286,6 @@ public class OptTreeConverter throw new DMLRuntimeException("Unsupported instruction type."); } - //create statistics - OptNodeStatistics stats = analyzeStatistics(inst, node, vars); - node.setStatistics(stats); - return node; } @@ -465,9 +461,10 @@ public class OptTreeConverter //TODO remove this workaround once this information can be obtained from hops/lops compiler if( node.isCPOnly() ) { - if( containsMRJobInstruction(pb, false, false) ) + boolean isSparkExec = OptimizerUtils.isSparkExecutionMode(); + if( !isSparkExec && containsMRJobInstruction(pb, false, false) ) node.setExecType(ExecType.MR); - else if(containsMRJobInstruction(pb, false, true)) + else if( isSparkExec && containsMRJobInstruction(pb, false, true)) node.setExecType(ExecType.SPARK); } } @@ -622,7 +619,7 @@ public class OptTreeConverter ret = rContainsMRJobInstruction(pb2, inclFunctions); if( ret ) return ret; } - } + } else if ( pb instanceof FunctionProgramBlock ) //includes ExternalFunctionProgramBlock and ExternalFunctionProgramBlockCP) { //do nothing @@ -630,53 +627,25 @@ public class OptTreeConverter else { ret = containsMRJobInstruction(pb, true, true) - | (inclFunctions && containsFunctionCallInstruction(pb)); + || (inclFunctions && containsFunctionCallInstruction(pb)); } return ret; } - public static boolean containsMRJobInstruction( ProgramBlock pb, boolean inclCPFile, boolean inclSpark ) - { + public static boolean containsMRJobInstruction( ProgramBlock pb, boolean inclCPFile, boolean inclSpark ) { return containsMRJobInstruction(pb.getInstructions(), inclCPFile, inclSpark); } - public static boolean containsMRJobInstruction( ArrayList<Instruction> instSet, boolean inclCPFile, boolean inclSpark ) - { - boolean ret = false; - if( instSet!=null ) - for( Instruction inst : instSet ) - if( inst instanceof MRJobInstruction - || (inclSpark && inst instanceof SPInstruction) - || (inclCPFile && (inst instanceof MatrixIndexingCPFileInstruction || inst instanceof ParameterizedBuiltinCPFileInstruction))) - { - ret = true; - break; - } - - return ret; + public static boolean containsMRJobInstruction( ArrayList<Instruction> instSet, boolean inclCPFile, boolean inclSpark ) { + return instSet.stream().anyMatch(inst -> inst instanceof MRJobInstruction + || (inclSpark && inst instanceof SPInstruction) + || (inclCPFile && (inst instanceof MatrixIndexingCPFileInstruction || inst instanceof ParameterizedBuiltinCPFileInstruction))); } - public static boolean containsFunctionCallInstruction( ProgramBlock pb ) - { - boolean ret = false; - for( Instruction inst : pb.getInstructions() ) - if( inst instanceof FunctionCallCPInstruction ) - { - ret = true; - break; - } - - return ret; - } - - private static OptNodeStatistics analyzeStatistics(Instruction inst, OptNode on, LocalVariableMap vars) - throws DMLRuntimeException - { - //since the performance test tool for offline profiling has been removed, - //we return default values - - return new OptNodeStatistics(); //default values + public static boolean containsFunctionCallInstruction( ProgramBlock pb ) { + return pb.getInstructions().stream() + .anyMatch(inst -> inst instanceof FunctionCallCPInstruction); } public static void replaceProgramBlock(OptNode parent, OptNode n, ProgramBlock pbOld, ProgramBlock pbNew, boolean rtMap) @@ -700,22 +669,22 @@ public class OptTreeConverter { IfProgramBlock ipb = (IfProgramBlock) pbParent; replaceProgramBlock( ipb.getChildBlocksIfBody(), pbOld, pbNew ); - replaceProgramBlock( ipb.getChildBlocksElseBody(), pbOld, pbNew ); + replaceProgramBlock( ipb.getChildBlocksElseBody(), pbOld, pbNew ); } else if( pbParent instanceof WhileProgramBlock ) { WhileProgramBlock wpb = (WhileProgramBlock) pbParent; - replaceProgramBlock( wpb.getChildBlocks(), pbOld, pbNew ); + replaceProgramBlock( wpb.getChildBlocks(), pbOld, pbNew ); } else if( pbParent instanceof ForProgramBlock || pbParent instanceof ParForProgramBlock ) { ForProgramBlock fpb = (ForProgramBlock) pbParent; - replaceProgramBlock( fpb.getChildBlocks(), pbOld, pbNew ); + replaceProgramBlock( fpb.getChildBlocks(), pbOld, pbNew ); } else if( pbParent instanceof FunctionProgramBlock ) { FunctionProgramBlock fpb = (FunctionProgramBlock) pbParent; - replaceProgramBlock( fpb.getChildBlocks(), pbOld, pbNew ); + replaceProgramBlock( fpb.getChildBlocks(), pbOld, pbNew ); } else throw new DMLRuntimeException("Optimizer doesn't support "+pbParent.getClass().getName()); http://git-wip-us.apache.org/repos/asf/systemml/blob/7cb95667/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreePlanMapping.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreePlanMapping.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreePlanMapping.java index 4f9bd69..b44579f 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreePlanMapping.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreePlanMapping.java @@ -34,7 +34,7 @@ public class OptTreePlanMapping protected IDSequence _idSeq; protected Map<Long, OptNode> _id_optnode; - + public OptTreePlanMapping() { _idSeq = new IDSequence(); http://git-wip-us.apache.org/repos/asf/systemml/blob/7cb95667/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizationWrapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizationWrapper.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizationWrapper.java index d7f6150..c77d4d0 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizationWrapper.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizationWrapper.java @@ -111,7 +111,7 @@ public class OptimizationWrapper //set max contraints if not specified int ck = UtilFunctions.toInt( Math.max( InfrastructureAnalyzer.getCkMaxCP(), - InfrastructureAnalyzer.getCkMaxMR() ) * PAR_FACTOR_INFRASTRUCTURE ); + InfrastructureAnalyzer.getCkMaxMR() ) * PAR_FACTOR_INFRASTRUCTURE ); double cm = InfrastructureAnalyzer.getCmMax() * OptimizerUtils.MEM_UTIL_FACTOR; //execute optimizer http://git-wip-us.apache.org/repos/asf/systemml/blob/7cb95667/src/main/java/org/apache/sysml/runtime/instructions/InstructionParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/InstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/InstructionParser.java index 7a8ef75..e483100 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/InstructionParser.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/InstructionParser.java @@ -28,44 +28,42 @@ import org.apache.sysml.runtime.instructions.spark.SPInstruction.SPType; public class InstructionParser -{ +{ public static Instruction parseSingleInstruction ( String str ) throws DMLRuntimeException - { + { if ( str == null || str.isEmpty() ) return null; - String execType = str.split(Instruction.OPERAND_DELIM)[0]; - if ( execType.equalsIgnoreCase(ExecType.CP.toString()) - || execType.equalsIgnoreCase(ExecType.CP_FILE.toString()) ) - { - CPType cptype = InstructionUtils.getCPType(str); - if( cptype == null ) - throw new DMLRuntimeException("Unknown CP instruction: " + str); - return CPInstructionParser.parseSingleInstruction (cptype, str); - } - else if ( execType.equalsIgnoreCase(ExecType.SPARK.toString()) ) - { - SPType sptype = InstructionUtils.getSPType(str); - if( sptype == null ) - throw new DMLRuntimeException("Unknown SPARK instruction: " + str); - return SPInstructionParser.parseSingleInstruction (sptype, str); - } - else if ( execType.equalsIgnoreCase(ExecType.GPU.toString()) ) - { - GPUINSTRUCTION_TYPE gputype = InstructionUtils.getGPUType(str); - if( gputype == null ) - throw new DMLRuntimeException("Unknown GPU instruction: " + str); - return GPUInstructionParser.parseSingleInstruction (gputype, str); - } - else if ( execType.equalsIgnoreCase("MR") ) { - MRType mrtype = InstructionUtils.getMRType(str); - if( mrtype == null ) - throw new DMLRuntimeException("Unknown MR instruction: " + str); - return MRInstructionParser.parseSingleInstruction (mrtype, str); - } - else { - throw new DMLRuntimeException("Unknown execution type in instruction: " + str); + ExecType et = InstructionUtils.getExecType(str); + switch( et ) { + case CP: + case CP_FILE: { + CPType cptype = InstructionUtils.getCPType(str); + if( cptype == null ) + throw new DMLRuntimeException("Unknown CP instruction: " + str); + return CPInstructionParser.parseSingleInstruction (cptype, str); + } + case SPARK: { + SPType sptype = InstructionUtils.getSPType(str); + if( sptype == null ) + throw new DMLRuntimeException("Unknown SPARK instruction: " + str); + return SPInstructionParser.parseSingleInstruction (sptype, str); + } + case GPU: { + GPUINSTRUCTION_TYPE gputype = InstructionUtils.getGPUType(str); + if( gputype == null ) + throw new DMLRuntimeException("Unknown GPU instruction: " + str); + return GPUInstructionParser.parseSingleInstruction (gputype, str); + } + case MR: { + MRType mrtype = InstructionUtils.getMRType(str); + if( mrtype == null ) + throw new DMLRuntimeException("Unknown MR instruction: " + str); + return MRInstructionParser.parseSingleInstruction (mrtype, str); + } + default: + throw new DMLRuntimeException("Unknown execution type in instruction: " + str); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/7cb95667/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java index 5e829ee..c75a20b 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java @@ -24,6 +24,7 @@ import java.util.StringTokenizer; import org.apache.sysml.lops.AppendM; import org.apache.sysml.lops.BinaryM; import org.apache.sysml.lops.GroupedAggregateM; +import org.apache.sysml.lops.LopProperties.ExecType; import org.apache.sysml.lops.MapMult; import org.apache.sysml.lops.MapMultChain; import org.apache.sysml.lops.PMMJ; @@ -191,9 +192,13 @@ public class InstructionUtils return ret; } + + public static ExecType getExecType( String str ) { + int ix = str.indexOf(Instruction.OPERAND_DELIM); + return ExecType.valueOf(str.substring(0, ix)); + } - public static String getOpCode( String str ) - { + public static String getOpCode( String str ) { int ix1 = str.indexOf(Instruction.OPERAND_DELIM); int ix2 = str.indexOf(Instruction.OPERAND_DELIM, ix1+1); return str.substring(ix1+1, ix2);
