[SYSTEMML-1546] Fix parfor optimizer (result/task partitioning on spark) Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/4c74a343 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/4c74a343 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/4c74a343
Branch: refs/heads/master Commit: 4c74a34349bd4eeb0f4e102db7bca1f09b2ced97 Parents: 30d4b1e Author: Matthias Boehm <[email protected]> Authored: Wed Apr 19 00:12:03 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Wed Apr 19 00:12:03 2017 -0700 ---------------------------------------------------------------------- .../parfor/opt/OptimizerConstrained.java | 2 +- .../parfor/opt/OptimizerRuleBased.java | 44 +++++++++++--------- 2 files changed, 25 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4c74a343/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java index 235b927..fb83fd6 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java @@ -131,7 +131,7 @@ public class OptimizerConstrained extends OptimizerRuleBased boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0a, M1, M2, M3, flagLIX ); //exec-type-specific rewrites - if( pn.getExecType() == ExecType.MR || pn.getExecType() == ExecType.SPARK ) + if( pn.getExecType() == getRemoteExecType() ) { if( M1 > _rm && M3 <= _rm ) { // rewrite 1: data partitioning (apply conditional partitioning) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4c74a343/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java index f0eb0e7..0ff7a31 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java @@ -262,7 +262,7 @@ public class OptimizerRuleBased extends Optimizer boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0a, M1, M2, M3, flagLIX ); //exec-type-specific rewrites - if( pn.getExecType() == ExecType.MR || pn.getExecType()==ExecType.SPARK ) + if( pn.getExecType() == getRemoteExecType() ) { if( M1 > _rm && M3 <= _rm ) { // rewrite 1: data partitioning (apply conditional partitioning) @@ -400,6 +400,10 @@ public class OptimizerRuleBased extends Optimizer _rkmax2 = (int) Math.ceil( PAR_K_FACTOR * _rk2 ); } + protected ExecType getRemoteExecType() { + return OptimizerUtils.isSparkExecutionMode() ? ExecType.SPARK : ExecType.MR; + } + /////// //REWRITE set data partitioner /// @@ -483,7 +487,7 @@ public class OptimizerRuleBased extends Optimizer //NOTE: for the moment, we do not partition according to the remote mem, because we can execute //it even without partitioning in CP. However, advanced optimizers should reason about this //double mold = h.getMemEstimate(); - if( n.getExecType() == ExecType.MR || n.getExecType()==ExecType.SPARK //Opt Condition: MR/Spark + if( n.getExecType() == getRemoteExecType() //Opt Condition: MR/Spark || h.getMemEstimate() > thetaM ) //Opt Condition: mem estimate > constraint to force partitioning { //NOTE: subsequent rewrites will still use the MR mem estimate @@ -608,23 +612,22 @@ public class OptimizerRuleBased extends Optimizer ParForProgramBlock pfpb = (ParForProgramBlock) o[1]; //search for candidates - Collection<OptNode> cand = n.getNodeList(ExecType.MR); + Collection<OptNode> cand = n.getNodeList(getRemoteExecType()); //determine if applicable - boolean apply = M < _rm //ops fit in remote memory budget - && !cand.isEmpty() //at least one MR - && isResultPartitionableAll(cand,pfpb.getResultVariables(),vars, pfpb.getIterablePredicateVars()[0]); // check candidates + boolean apply = M < _rm //ops fit in remote memory budget + && !cand.isEmpty() //at least one MR + && isResultPartitionableAll(cand,pfpb.getResultVariables(), + vars, pfpb.getIterablePredicateVars()[0]); // check candidates //recompile LIX if( apply ) { - try - { + try { for(OptNode lix : cand) recompileLIX( lix, vars ); } - catch(Exception ex) - { + catch(Exception ex) { throw new DMLRuntimeException("Unable to recompile LIX.", ex); } } @@ -827,8 +830,9 @@ public class OptimizerRuleBased extends Optimizer boolean isCPOnlyPossible = isCPOnly || isCPOnlyPossible(n, _rm); String datapartitioner = n.getParam(ParamType.DATA_PARTITIONER); - ExecType REMOTE = OptimizerUtils.isSparkExecutionMode() ? ExecType.SPARK : ExecType.MR; - PDataPartitioner REMOTE_DP = OptimizerUtils.isSparkExecutionMode() ? PDataPartitioner.REMOTE_SPARK : PDataPartitioner.REMOTE_MR; + ExecType REMOTE = getRemoteExecType(); + PDataPartitioner REMOTE_DP = OptimizerUtils.isSparkExecutionMode() ? + PDataPartitioner.REMOTE_SPARK : PDataPartitioner.REMOTE_MR; //deciding on the execution strategy if( ConfigurationManager.isParallelParFor() //allowed remote parfor execution @@ -906,7 +910,7 @@ public class OptimizerRuleBased extends Optimizer ExecType et = n.getExecType(); boolean ret = ( et == ExecType.CP); - if( n.isLeaf() && (et == ExecType.MR || et == ExecType.SPARK) ) + if( n.isLeaf() && et == getRemoteExecType() ) { Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop( n.getID() ); if( h.getForcedExecType()!=LopProperties.ExecType.MR //e.g., -exec=hadoop @@ -1156,7 +1160,7 @@ public class OptimizerRuleBased extends Optimizer .getAbstractPlanMapping().getMappedProg(n.getID())[1]; //decide on the replication factor - if( n.getExecType()==ExecType.MR || n.getExecType()==ExecType.SPARK ) + if( n.getExecType()==getRemoteExecType() ) { apply = true; @@ -1417,7 +1421,8 @@ public class OptimizerRuleBased extends Optimizer { setTaskPartitioner( pn, PTaskPartitioner.FACTORING_CMAX ); } - else if( pn.getExecType()==ExecType.MR && !jvmreuse && pn.hasOnlySimpleChilds() ) + else if( ((pn.getExecType()==ExecType.MR && !jvmreuse) + || pn.getExecType()==ExecType.SPARK) && pn.hasOnlySimpleChilds() ) { //for simple body programs without loops, branches, or function calls, we don't //expect much load imbalance and hence use static partitioning in order to @@ -2931,7 +2936,7 @@ public class OptimizerRuleBased extends Optimizer PResultMerge ret = null; //investigate details of current parfor node - boolean flagRemoteParFOR = (n.getExecType() == ExecType.MR || n.getExecType() == ExecType.SPARK); + boolean flagRemoteParFOR = (n.getExecType() == getRemoteExecType()); boolean flagLargeResult = hasLargeTotalResults( n, pfpb.getResultVariables(), vars, true ); boolean flagRemoteLeftIndexing = hasResultMRLeftIndexing( n, pfpb.getResultVariables(), vars, true ); boolean flagCellFormatWoCompare = determineFlagCellFormatWoCompare(pfpb.getResultVariables(), vars); @@ -3015,8 +3020,8 @@ public class OptimizerRuleBased extends Optimizer { String opName = n.getParam(ParamType.OPSTRING); //check opstring and exec type - if( opName !=null && opName.equals(LeftIndexingOp.OPSTRING) && - (n.getExecType() == ExecType.MR || n.getExecType() == ExecType.SPARK) ) + if( opName != null && opName.equals(LeftIndexingOp.OPSTRING) + && n.getExecType() == getRemoteExecType() ) { LeftIndexingOp hop = (LeftIndexingOp) OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); //check agains set of varname @@ -3151,7 +3156,7 @@ public class OptimizerRuleBased extends Optimizer if( n.getNodeType() == NodeType.PARFOR ) { rewriteSetResultMerge(n, vars, inLocal); - if( n.getExecType()==ExecType.MR || n.getExecType()==ExecType.SPARK ) + if( n.getExecType()==getRemoteExecType() ) inLocal = false; } else if( n.getChilds()!=null ) @@ -3493,7 +3498,6 @@ public class OptimizerRuleBased extends Optimizer return count; } - //////////////////////// // Helper methods // ////////////////////////
