Repository: systemml Updated Branches: refs/heads/master 09b1533de -> 4c7640b87
[SYSTEMML-1349,2251] Extended parfor optimizer for shared reads This patch resolves a long standing shortcoming of the parfor optimizer, of setting the degree of parallelism too low because shared reads were not handled properly and thus double counted for each worker. Especially on modern processors with increasing number of cores this became more important to reduce unnecessary barriers (a too low parfor degree of parallelism leads to distributing the remaining par to included parfor loops and operations which require barriers per operation). Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/9a089151 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/9a089151 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/9a089151 Branch: refs/heads/master Commit: 9a089151bf2bc518fe9eaf5b3fe9505f237dc399 Parents: 09b1533 Author: Matthias Boehm <[email protected]> Authored: Thu Apr 19 18:57:51 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Thu Apr 19 18:57:51 2018 -0700 ---------------------------------------------------------------------- src/main/java/org/apache/sysml/hops/Hop.java | 48 ++++++++-------- .../parfor/opt/CostEstimator.java | 22 ++++--- .../parfor/opt/CostEstimatorHops.java | 9 ++- .../parfor/opt/OptimizerConstrained.java | 8 +-- .../parfor/opt/OptimizerRuleBased.java | 60 +++++++++++++------- 5 files changed, 86 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/9a089151/src/main/java/org/apache/sysml/hops/Hop.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/Hop.java b/src/main/java/org/apache/sysml/hops/Hop.java index 9445220..059132d 100644 --- a/src/main/java/org/apache/sysml/hops/Hop.java +++ b/src/main/java/org/apache/sysml/hops/Hop.java @@ -20,6 +20,7 @@ package org.apache.sysml.hops; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -459,6 +460,19 @@ public abstract class Hop implements ParseInfo return _outputEmptyBlocks; } + + protected double getInputOutputSize() { + return _outputMemEstimate + + _processingMemEstimate + + getInputSize(); + } + + public double getInputOutputSize(Collection<String> exclVars) { + return _outputMemEstimate + + _processingMemEstimate + + getInputSize(exclVars); + } + /** * Returns the memory estimate for the output produced from this Hop. * It must be invoked only within Hops. From outside Hops, one must @@ -467,21 +481,22 @@ public abstract class Hop implements ParseInfo * * @return output size memory estimate */ - protected double getOutputSize() - { + protected double getOutputSize() { return _outputMemEstimate; } + + protected double getInputSize() { + return getInputSize(null); + } - protected double getInputSize() - { - double sum = 0; + protected double getInputSize(Collection<String> exclVars) { + double sum = 0; int len = _input.size(); - - for( int i=0; i<len; i++ ) //for all inputs - { + for( int i=0; i<len; i++ ) { //for all inputs Hop hi = _input.get(i); + if( exclVars != null && exclVars.contains(hi.getName()) ) + continue; double hmout = hi.getOutputMemEstimate(); - if( hmout > 1024*1024 ) {//for relevant sizes //check if already included in estimate (if an input is used //multiple times it is still only required once in memory) @@ -491,24 +506,9 @@ public abstract class Hop implements ParseInfo flag |= (hi == _input.get(j)); hmout = flag ? 0 : hmout; } - sum += hmout; } - //for(Hop h : _input ) { - // sum += h._outputMemEstimate; - //} - - return sum; - } - - protected double getInputOutputSize() - { - double sum = 0; - sum += _outputMemEstimate; - sum += _processingMemEstimate; - sum += getInputSize(); - return sum; } http://git-wip-us.apache.org/repos/asf/systemml/blob/9a089151/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java index 167120f..c726308 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java @@ -26,7 +26,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.sysml.lops.LopProperties.ExecType; -import org.apache.sysml.parser.ParForStatementBlock.ResultVar; import org.apache.sysml.runtime.controlprogram.parfor.opt.OptNode.ParamType; /** @@ -47,17 +46,20 @@ public abstract class CostEstimator public static final double DEFAULT_MEM_ESTIMATE_MR = 20*1024*1024; //default memory consumption: 20MB public enum TestMeasure { - EXEC_TIME, - MEMORY_USAGE + EXEC_TIME, MEMORY_USAGE } public enum DataFormat { - DENSE, - SPARSE + DENSE, SPARSE + } + + public enum ExcludeType { + NONE, SHARED_READ, RESULT_LIX } protected boolean _inclCondPart = false; - protected Collection<ResultVar> _exclRetVars = null; + protected Collection<String> _exclVars = null; + protected ExcludeType _exclType = ExcludeType.NONE; /** * Main leaf node estimation method - to be overwritten by specific cost estimators @@ -102,12 +104,14 @@ public abstract class CostEstimator return val; } - public double getEstimate(TestMeasure measure, OptNode node, boolean inclCondPart, Collection<ResultVar> retVars) { + public double getEstimate(TestMeasure measure, OptNode node, boolean inclCondPart, Collection<String> vars, ExcludeType extype) { _inclCondPart = inclCondPart; //temporary - _exclRetVars = retVars; + _exclVars = vars; + _exclType = extype; double val = getEstimate(measure, node, null); _inclCondPart = false; - _exclRetVars = null; + _exclVars = null; + _exclType = ExcludeType.NONE; return val; } http://git-wip-us.apache.org/repos/asf/systemml/blob/9a089151/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorHops.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorHops.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorHops.java index 3cb0c6f..4c3e7f0 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorHops.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorHops.java @@ -24,7 +24,6 @@ import org.apache.sysml.hops.Hop; import org.apache.sysml.hops.LeftIndexingOp; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.lops.LopProperties.ExecType; -import org.apache.sysml.parser.ParForStatementBlock.ResultVar; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.parfor.opt.OptNode.NodeType; import org.apache.sysml.runtime.controlprogram.parfor.opt.Optimizer.CostModelType; @@ -61,6 +60,10 @@ public class CostEstimatorHops extends CostEstimator Hop h = _map.getMappedHop( node.getID() ); double value = h.getMemEstimate(); + //correction for disabled shared read accounting + value = (_exclVars!=null && _exclType==ExcludeType.SHARED_READ) ? + h.getInputOutputSize(_exclVars) : value; + //handle specific cases double DEFAULT_MEM_REMOTE = OptimizerUtils.isSparkExecutionMode() ? DEFAULT_MEM_SP : DEFAULT_MEM_MR; @@ -100,8 +103,8 @@ public class CostEstimatorHops extends CostEstimator } //correction for disabled result indexing - value = (_exclRetVars!=null && h instanceof LeftIndexingOp - && ResultVar.contains(_exclRetVars, h.getName())) ? 0 : value; + value = (_exclVars!=null && _exclType==ExcludeType.RESULT_LIX + && h instanceof LeftIndexingOp && _exclVars.contains(h.getName())) ? 0 : value; if( LOG.isTraceEnabled() ) { LOG.trace("Memory estimate "+h.getName()+", "+h.getOpString() http://git-wip-us.apache.org/repos/asf/systemml/blob/9a089151/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java index 4088e63..deedf29 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java @@ -152,7 +152,7 @@ public class OptimizerConstrained extends OptimizerRuleBased super.rewriteSetExportReplicationFactor( pn, ec.getVariables() ); // rewrite 10: determine parallelism - rewriteSetDegreeOfParallelism( pn, M1, false ); + rewriteSetDegreeOfParallelism( pn, _cost, ec.getVariables(), M1, false ); // rewrite 11: task partitioning rewriteSetTaskPartitioner( pn, false, flagLIX ); @@ -174,7 +174,7 @@ public class OptimizerConstrained extends OptimizerRuleBased else //if( pn.getExecType() == ExecType.CP ) { // rewrite 10: determine parallelism - rewriteSetDegreeOfParallelism( pn, M1, false ); + rewriteSetDegreeOfParallelism( pn, _cost, ec.getVariables(), M1, false ); // rewrite 11: task partitioning rewriteSetTaskPartitioner( pn, false, false ); //flagLIX always false @@ -282,7 +282,7 @@ public class OptimizerConstrained extends OptimizerRuleBased /// @Override - protected void rewriteSetDegreeOfParallelism(OptNode n, double M, boolean flagNested) { + protected void rewriteSetDegreeOfParallelism(OptNode n, CostEstimator cost, LocalVariableMap vars, double M, boolean flagNested) { // constraint awareness if( n.getK() > 0 && ConfigurationManager.isParallelParFor() ) { @@ -299,7 +299,7 @@ public class OptimizerConstrained extends OptimizerRuleBased LOG.debug(getOptMode()+" OPT: forced 'set degree of parallelism' - result=(see EXPLAIN)" ); } else - super.rewriteSetDegreeOfParallelism(n, M, flagNested); + super.rewriteSetDegreeOfParallelism(n, cost, vars, M, flagNested); } http://git-wip-us.apache.org/repos/asf/systemml/blob/9a089151/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java index 2bf8fe5..91124a0 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map.Entry; import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -81,6 +82,7 @@ import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter; import org.apache.sysml.runtime.controlprogram.parfor.ResultMergeLocalFile; +import org.apache.sysml.runtime.controlprogram.parfor.opt.CostEstimator.ExcludeType; import org.apache.sysml.runtime.controlprogram.parfor.opt.CostEstimator.TestMeasure; import org.apache.sysml.runtime.controlprogram.parfor.opt.OptNode.ExecType; import org.apache.sysml.runtime.controlprogram.parfor.opt.OptNode.NodeType; @@ -271,7 +273,7 @@ public class OptimizerRuleBased extends Optimizer rewriteSetExportReplicationFactor( pn, ec.getVariables() ); // rewrite 10: determine parallelism - rewriteSetDegreeOfParallelism( pn, M1, false ); + rewriteSetDegreeOfParallelism( pn, _cost, ec.getVariables(), M1, false ); // rewrite 11: task partitioning rewriteSetTaskPartitioner( pn, false, flagLIX ); @@ -292,7 +294,7 @@ public class OptimizerRuleBased extends Optimizer else //if( pn.getExecType() == ExecType.CP ) { // rewrite 10: determine parallelism - rewriteSetDegreeOfParallelism( pn, M1, false ); + rewriteSetDegreeOfParallelism( pn, _cost, ec.getVariables(), M1, false ); // rewrite 11: task partitioning rewriteSetTaskPartitioner( pn, false, false ); //flagLIX always false @@ -1173,25 +1175,36 @@ public class OptimizerRuleBased extends Optimizer //REWRITE set degree of parallelism /// - protected void rewriteSetDegreeOfParallelism(OptNode n, double M, boolean flagNested) + protected void rewriteSetDegreeOfParallelism(OptNode n, CostEstimator cost, LocalVariableMap vars, double M, boolean flagNested) { ExecType type = n.getExecType(); long id = n.getID(); - + //special handling for different exec models (CP, MR, MR nested) - ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter - .getAbstractPlanMapping().getMappedProg(id)[1]; + Object[] map = OptTreeConverter.getAbstractPlanMapping().getMappedProg(id); + ParForStatementBlock pfsb = (ParForStatementBlock)map[0]; + ParForProgramBlock pfpb = (ParForProgramBlock)map[1]; if( type == ExecType.CP ) { //determine local max parallelism constraint int kMax = ConfigurationManager.isParallelParFor() ? - (n.isCPOnly() ? _lkmaxCP : _lkmaxMR) : 1; + (n.isCPOnly() ? _lkmaxCP : _lkmaxMR) : 1; + + //compute memory budgets and partial estimates for handling shared reads + double mem = (OptimizerUtils.isSparkExecutionMode() && !n.isCPOnly()) ? _lm/2 : _lm; + double sharedM = 0, nonSharedM = M; + if( computeMaxK(M, M, 0, mem) < kMax ) { //account for shared read if necessary + sharedM = pfsb.getReadOnlyParentVars().stream().map(s -> vars.get(s)) + .filter(d -> d instanceof MatrixObject).mapToDouble(mo -> OptimizerUtils + .estimateSize(((MatrixObject)mo).getMatrixCharacteristics())).sum(); + nonSharedM = cost.getEstimate(TestMeasure.MEMORY_USAGE, n, true, + pfsb.getReadOnlyParentVars(), ExcludeType.SHARED_READ); + } //ensure local memory constraint (for spark more conservative in order to //prevent unnecessary guarded collect) - double mem = (OptimizerUtils.isSparkExecutionMode() && !n.isCPOnly()) ? _lm/2 : _lm; - kMax = Math.min( kMax, (int)Math.floor( mem / M ) ); + kMax = Math.min( kMax, computeMaxK(M, nonSharedM, sharedM, mem) ); kMax = Math.max( kMax, 1); //constrain max parfor parallelism by problem size @@ -1226,21 +1239,17 @@ public class OptimizerRuleBased extends Optimizer else // ExecType.MR/ExecType.SPARK { int kMax = -1; - if( flagNested ) - { + if( flagNested ) { //determine remote max parallelism constraint pfpb.setDegreeOfParallelism( _rnk ); //guaranteed <= _N (see nested) - n.setK( _rnk ); - + n.setK( _rnk ); kMax = _rkmax / _rnk; //per node (CP only inside) } - else //not nested (default) - { + else { //not nested (default) //determine remote max parallelism constraint int tmpK = (int)((_N<_rk)? _N : _rk); pfpb.setDegreeOfParallelism(tmpK); - n.setK(tmpK); - + n.setK(tmpK); kMax = _rkmax / tmpK; //per node (CP only inside) } @@ -1252,14 +1261,22 @@ public class OptimizerRuleBased extends Optimizer //disable nested parallelism, if required if( !ALLOW_REMOTE_NESTED_PARALLELISM ) kMax = 1; - + //distribute remaining parallelism and recompile parallel instructions - rAssignRemainingParallelism( n, kMax, 1 ); - } + rAssignRemainingParallelism( n, kMax, 1 ); + } _numEvaluatedPlans++; LOG.debug(getOptMode()+" OPT: rewrite 'set degree of parallelism' - result=(see EXPLAIN)" ); } + + private int computeMaxK(double M, double memNonShared, double memShared, double memBudget) { + //note: we compute max K for both w/o and w/ shared reads and take the max, because + //the latter might reduce the degree of parallelism if shared reads don't dominate + int k1 = (int)Math.floor(memBudget / M); + int k2 = (int)Math.floor(memBudget-memShared / memNonShared); + return Math.max(k1, k2); + } protected void rAssignRemainingParallelism(OptNode n, int parforK, int opsK) { @@ -1644,7 +1661,8 @@ public class OptimizerRuleBased extends Optimizer double sum = computeTotalSizeResultVariables(retVars, vars, pfpb.getDegreeOfParallelism()); //compute memory estimate without result indexing, and total sum per worker - double M = cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, true, retVars); + double M = cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, true, retVars.stream() + .map(var -> var._name).collect(Collectors.toList()), ExcludeType.RESULT_LIX); totalMem = M + sum; //result update in-place for MR/Spark (w/ remote memory constraint)
