[SYSTEMML-1336] Improved parfor optimizer (conditional partitioning) This patch improves the parfor optimizer to consider what-if scenarios with conditional partitioning to avoid falling back to local parfor plans with small degree of parallelism (if the data barely fits in the driver) although we could have applied a fused partition-execute parfor job.
For example, on perftest 8GB univariate-stats, it improved the end-to-end runtime (incl spark context creation and I/O) from 781s to 110s. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/2f7fa8d7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/2f7fa8d7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/2f7fa8d7 Branch: refs/heads/master Commit: 2f7fa8d73fa9680df283444627209a31c5ef4acd Parents: 35da413 Author: Matthias Boehm <[email protected]> Authored: Thu Feb 23 22:19:24 2017 -0800 Committer: Matthias Boehm <[email protected]> Committed: Fri Feb 24 12:27:29 2017 -0800 ---------------------------------------------------------------------- .../parfor/opt/CostEstimator.java | 26 +++++-- .../controlprogram/parfor/opt/OptNode.java | 2 + .../parfor/opt/OptimizerConstrained.java | 37 ++++++---- .../parfor/opt/OptimizerRuleBased.java | 75 +++++++++++--------- 4 files changed, 90 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7fa8d7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java index bb3ca88..3fdf8bd 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java @@ -55,6 +55,8 @@ public abstract class CostEstimator SPARSE } + protected boolean _inclCondPart = false; + /** * Main leaf node estimation method - to be overwritten by specific cost estimators * @@ -88,6 +90,7 @@ public abstract class CostEstimator * * @param measure ? * @param node internal representation of a plan alternative for program blocks and instructions + * @param inclCondPart including conditional partitioning * @return estimate? * @throws DMLRuntimeException if DMLRuntimeException occurs */ @@ -97,13 +100,26 @@ public abstract class CostEstimator return getEstimate(measure, node, null); } + public double getEstimate( TestMeasure measure, OptNode node, boolean inclCondPart ) + throws DMLRuntimeException + { + //temporarily change local flag and get estimate + boolean oldInclCondPart = _inclCondPart; + _inclCondPart = inclCondPart; + double val = getEstimate(measure, node, null); + + //reset local flag and return + _inclCondPart = oldInclCondPart; + return val; + } + /** * Main estimation method. * - * @param measure ? - * @param node internal representation of a plan alternative for program blocks and instructions + * @param measure estimate type (time or memory) + * @param node plan opt tree node * @param et execution type - * @return estimate? + * @return estimate * @throws DMLRuntimeException if DMLRuntimeException occurs */ public double getEstimate( TestMeasure measure, OptNode node, ExecType et ) @@ -113,7 +129,9 @@ public abstract class CostEstimator if( node.isLeaf() ) { - if( et != null ) + if( _inclCondPart && node.getParam(ParamType.DATA_PARTITION_COND_MEM) != null ) + val = Double.parseDouble(node.getParam(ParamType.DATA_PARTITION_COND_MEM)); + else if( et != null ) val = getLeafNodeEstimate(measure, node, et); //forced type else val = getLeafNodeEstimate(measure, node); //default http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7fa8d7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java index 7968c6a..26c30d4 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java @@ -71,6 +71,8 @@ public class OptNode TASK_SIZE, DATA_PARTITIONER, DATA_PARTITION_FORMAT, + DATA_PARTITION_COND, + DATA_PARTITION_COND_MEM, RESULT_MERGE, NUM_ITERATIONS, RECURSIVE_CALL http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7fa8d7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java index 39e742f..6edcec3 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java @@ -80,7 +80,6 @@ public class OptimizerConstrained extends OptimizerRuleBased LOG.debug("--- "+getOptMode()+" OPTIMIZER -------"); OptNode pn = plan.getRoot(); - double M0 = -1, M1 = -1, M2 = -1; //memory consumption //early abort for empty parfor body if( pn.isLeaf() ) @@ -100,35 +99,45 @@ public class OptimizerConstrained extends OptimizerRuleBased ExecType oldET = pn.getExecType(); int oldK = pn.getK(); pn.setSerialParFor(); //for basic mem consumption - M0 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); + double M0a = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); pn.setExecType(oldET); pn.setK(oldK); - LOG.debug(getOptMode()+" OPT: estimated mem (serial exec) M="+toMB(M0) ); + LOG.debug(getOptMode()+" OPT: estimated mem (serial exec) M="+toMB(M0a) ); //OPTIMIZE PARFOR PLAN // rewrite 1: data partitioning (incl. log. recompile RIX) HashMap<String, PDataPartitionFormat> partitionedMatrices = new HashMap<String,PDataPartitionFormat>(); - rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices ); - M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate + rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices, OptimizerUtils.getLocalMemBudget() ); + double M0b = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate // rewrite 2: remove unnecessary compare matrix rewriteRemoveUnnecessaryCompareMatrix(pn, ec); // rewrite 3: rewrite result partitioning (incl. log/phy recompile LIX) - boolean flagLIX = super.rewriteSetResultPartitioning( pn, M1, ec.getVariables() ); - M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate - M2 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, LopProperties.ExecType.CP); + boolean flagLIX = super.rewriteSetResultPartitioning( pn, M0b, ec.getVariables() ); + double M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate LOG.debug(getOptMode()+" OPT: estimated new mem (serial exec) M="+toMB(M1) ); + + //determine memory consumption for what-if: all-cp or partitioned + double M2 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, LopProperties.ExecType.CP); LOG.debug(getOptMode()+" OPT: estimated new mem (serial exec, all CP) M="+toMB(M2) ); - + double M3 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, true); + LOG.debug(getOptMode()+" OPT: estimated new mem (cond partitioning) M="+toMB(M3) ); + // rewrite 4: execution strategy PExecMode tmpmode = getPExecMode(pn); //keep old - boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0, M1, M2, flagLIX ); + boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0a, M1, M2, M3, flagLIX ); //exec-type-specific rewrites if( pn.getExecType() == ExecType.MR || pn.getExecType() == ExecType.SPARK ) { + if( M1 > _rm && M3 <= _rm ) { + // rewrite 1: data partitioning (apply conditional partitioning) + rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices, M3 ); + M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate + } + if( flagRecompMR ){ //rewrite 5: set operations exec type rewriteSetOperationsExecType( pn, flagRecompMR ); @@ -221,7 +230,7 @@ public class OptimizerConstrained extends OptimizerRuleBased /// @Override - protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String,PDataPartitionFormat> partitionedMatrices) + protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String,PDataPartitionFormat> partitionedMatrices, double thetaM) throws DMLRuntimeException { boolean blockwise = false; @@ -235,7 +244,7 @@ public class OptimizerConstrained extends OptimizerRuleBased LOG.debug(getOptMode()+" OPT: forced 'set data partitioner' - result="+n.getParam(ParamType.DATA_PARTITIONER) ); } else - super.rewriteSetDataPartitioner(n, vars, partitionedMatrices); + super.rewriteSetDataPartitioner(n, vars, partitionedMatrices, thetaM); return blockwise; } @@ -246,7 +255,7 @@ public class OptimizerConstrained extends OptimizerRuleBased /// @Override - protected boolean rewriteSetExecutionStategy(OptNode n, double M0, double M, double M2, boolean flagLIX) + protected boolean rewriteSetExecutionStategy(OptNode n, double M0, double M, double M2, double M3, boolean flagLIX) throws DMLRuntimeException { boolean ret = false; @@ -270,7 +279,7 @@ public class OptimizerConstrained extends OptimizerRuleBased LOG.debug(getOptMode()+" OPT: forced 'set execution strategy' - result="+mode ); } else - ret = super.rewriteSetExecutionStategy(n, M0, M, M2, flagLIX); + ret = super.rewriteSetExecutionStategy(n, M0, M, M2, M3, flagLIX); return ret; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7fa8d7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java index bbe5bf7..87cabaa 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java @@ -216,7 +216,6 @@ public class OptimizerRuleBased extends Optimizer LOG.debug("--- "+getOptMode()+" OPTIMIZER -------"); OptNode pn = plan.getRoot(); - double M0 = -1, M1 = -1, M2 = -1; //memory consumption //early abort for empty parfor body if( pn.isLeaf() ) @@ -234,32 +233,42 @@ public class OptimizerRuleBased extends Optimizer //ESTIMATE memory consumption pn.setSerialParFor(); //for basic mem consumption - M0 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); - LOG.debug(getOptMode()+" OPT: estimated mem (serial exec) M="+toMB(M0) ); + double M0a = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); + LOG.debug(getOptMode()+" OPT: estimated mem (serial exec) M="+toMB(M0a) ); //OPTIMIZE PARFOR PLAN - // rewrite 1: data partitioning (incl. log. recompile RIX) + // rewrite 1: data partitioning (incl. log. recompile RIX and flag opt nodes) HashMap<String, PDataPartitionFormat> partitionedMatrices = new HashMap<String,PDataPartitionFormat>(); - rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices ); - M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate + rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices, OptimizerUtils.getLocalMemBudget() ); + double M0b = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate // rewrite 2: remove unnecessary compare matrix (before result partitioning) rewriteRemoveUnnecessaryCompareMatrix(pn, ec); // rewrite 3: rewrite result partitioning (incl. log/phy recompile LIX) - boolean flagLIX = rewriteSetResultPartitioning( pn, M1, ec.getVariables() ); - M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate - M2 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, LopProperties.ExecType.CP); + boolean flagLIX = rewriteSetResultPartitioning( pn, M0b, ec.getVariables() ); + double M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate LOG.debug(getOptMode()+" OPT: estimated new mem (serial exec) M="+toMB(M1) ); + + //determine memory consumption for what-if: all-cp or partitioned + double M2 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, LopProperties.ExecType.CP); LOG.debug(getOptMode()+" OPT: estimated new mem (serial exec, all CP) M="+toMB(M2) ); + double M3 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, true); + LOG.debug(getOptMode()+" OPT: estimated new mem (cond partitioning) M="+toMB(M3) ); // rewrite 4: execution strategy - boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0, M1, M2, flagLIX ); + boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0a, M1, M2, M3, flagLIX ); //exec-type-specific rewrites if( pn.getExecType() == ExecType.MR || pn.getExecType()==ExecType.SPARK ) { + if( M1 > _rm && M3 <= _rm ) { + // rewrite 1: data partitioning (apply conditional partitioning) + rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices, M3 ); + M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate + } + if( flagRecompMR ){ //rewrite 5: set operations exec type rewriteSetOperationsExecType( pn, flagRecompMR ); @@ -390,7 +399,7 @@ public class OptimizerRuleBased extends Optimizer //REWRITE set data partitioner /// - protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String, PDataPartitionFormat> partitionedMatrices ) + protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String, PDataPartitionFormat> partitionedMatrices, double thetaM ) throws DMLRuntimeException { if( n.getNodeType() != NodeType.PARFOR ) @@ -414,16 +423,15 @@ public class OptimizerRuleBased extends Optimizer for( String c : cand ) { PDataPartitionFormat dpf = pfsb.determineDataPartitionFormat( c ); - //System.out.println("Partitioning Format: "+dpf); + if( dpf != PDataPartitionFormat.NONE - && dpf != PDataPartitionFormat.BLOCK_WISE_M_N ) //FIXME + && dpf != PDataPartitionFormat.BLOCK_WISE_M_N ) { cand2.put( c, dpf ); - } - + } } - apply = rFindDataPartitioningCandidates(n, cand2, vars); + apply = rFindDataPartitioningCandidates(n, cand2, vars, thetaM); if( apply ) partitionedMatrices.putAll(cand2); } @@ -447,7 +455,7 @@ public class OptimizerRuleBased extends Optimizer return blockwise; } - protected boolean rFindDataPartitioningCandidates( OptNode n, HashMap<String, PDataPartitionFormat> cand, LocalVariableMap vars ) + protected boolean rFindDataPartitioningCandidates( OptNode n, HashMap<String, PDataPartitionFormat> cand, LocalVariableMap vars, double thetaM ) throws DMLRuntimeException { boolean ret = false; @@ -456,7 +464,7 @@ public class OptimizerRuleBased extends Optimizer { for( OptNode cn : n.getChilds() ) if( cn.getNodeType() != NodeType.FUNCCALL ) //prevent conflicts with aliases - ret |= rFindDataPartitioningCandidates( cn, cand, vars ); + ret |= rFindDataPartitioningCandidates( cn, cand, vars, thetaM ); } else if( n.getNodeType()== NodeType.HOP && n.getParam(ParamType.OPSTRING).equals(IndexingOp.OPSTRING) ) @@ -470,20 +478,22 @@ public class OptimizerRuleBased extends Optimizer //NOTE: for the moment, we do not partition according to the remote mem, because we can execute //it even without partitioning in CP. However, advanced optimizers should reason about this //double mold = h.getMemEstimate(); - if( n.getExecType() == ExecType.MR || n.getExecType()==ExecType.SPARK ) //Opt Condition: MR/Spark - // || (mold > _rm && mnew <= _rm) ) //Opt Condition: non-MR special cases (for remote exec) + if( n.getExecType() == ExecType.MR || n.getExecType()==ExecType.SPARK //Opt Condition: MR/Spark + || h.getMemEstimate() > thetaM ) //Opt Condition: mem estimate > constraint to force partitioning { //NOTE: subsequent rewrites will still use the MR mem estimate //(guarded by subsequent operations that have at least the memory req of one partition) - //if( mnew < _lm ) //apply rewrite if partitions fit into memory - // n.setExecType(ExecType.CP); - //else - // n.setExecType(ExecType.CP); //CP_FILE, but hop still in MR - n.setExecType(ExecType.CP); + n.setExecType(ExecType.CP); //partition ref only (see below) n.addParam(ParamType.DATA_PARTITION_FORMAT, dpf.toString()); h.setMemEstimate( mnew ); //CP vs CP_FILE in ProgramRecompiler bases on mem_estimate ret = true; } + //keep track of nodes that allow conditional data partitioning and their mem + else + { + n.addParam(ParamType.DATA_PARTITION_COND, String.valueOf(true)); + n.addParam(ParamType.DATA_PARTITION_COND_MEM, String.valueOf(mnew)); + } } } @@ -803,7 +813,7 @@ public class OptimizerRuleBased extends Optimizer //REWRITE set execution strategy /// - protected boolean rewriteSetExecutionStategy(OptNode n, double M0, double M, double M2, boolean flagLIX) + protected boolean rewriteSetExecutionStategy(OptNode n, double M0, double M, double M2, double M3, boolean flagLIX) throws DMLRuntimeException { boolean isCPOnly = n.isCPOnly(); @@ -814,26 +824,27 @@ public class OptimizerRuleBased extends Optimizer PDataPartitioner REMOTE_DP = OptimizerUtils.isSparkExecutionMode() ? PDataPartitioner.REMOTE_SPARK : PDataPartitioner.REMOTE_MR; //deciding on the execution strategy - if( ConfigurationManager.isParallelParFor() //allowed remote parfor execution - && ( (isCPOnly && M <= _rm ) //Required: all instruction can be be executed in CP - ||(isCPOnlyPossible && M2 <= _rm)) ) //Required: cp inst fit into remote JVM mem + if( ConfigurationManager.isParallelParFor() //allowed remote parfor execution + && ( (isCPOnly && M <= _rm ) //Required: all inst already in cp and fit in remote mem + ||(isCPOnly && M3 <= _rm ) //Required: all inst already in cp and fit partitioned in remote mem + ||(isCPOnlyPossible && M2 <= _rm)) ) //Required: all inst forced to cp fit in remote mem { //at this point all required conditions for REMOTE_MR given, now its an opt decision int cpk = (int) Math.min( _lk, Math.floor( _lm / M ) ); //estimated local exploited par //MR if local par cannot be exploited due to mem constraints (this implies that we work on large data) //(the factor of 2 is to account for hyper-threading and in order prevent too eager remote parfor) - if( 2*cpk < _lk && 2*cpk < _N && 2*cpk < _rk ) + if( 2*cpk < _lk && 2*cpk < _N && 2*cpk < _rk ) //incl conditional partitioning { n.setExecType( REMOTE ); //remote parfor } //MR if problem is large enough and remote parallelism is larger than local - else if( _lk < _N && _lk < _rk && isLargeProblem(n, M0) ) + else if( _lk < _N && _lk < _rk && M <= _rm && isLargeProblem(n, M0) ) { n.setExecType( REMOTE ); //remote parfor } //MR if MR operations in local, but CP only in remote (less overall MR jobs) - else if( (!isCPOnly) && isCPOnlyPossible ) + else if( !isCPOnly && isCPOnlyPossible ) { n.setExecType( REMOTE ); //remote parfor }
