[SYSTEMML-1546] Fix parfor optimizer (result/task partitioning on spark)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/4c74a343
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/4c74a343
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/4c74a343

Branch: refs/heads/master
Commit: 4c74a34349bd4eeb0f4e102db7bca1f09b2ced97
Parents: 30d4b1e
Author: Matthias Boehm <[email protected]>
Authored: Wed Apr 19 00:12:03 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Wed Apr 19 00:12:03 2017 -0700

----------------------------------------------------------------------
 .../parfor/opt/OptimizerConstrained.java        |  2 +-
 .../parfor/opt/OptimizerRuleBased.java          | 44 +++++++++++---------
 2 files changed, 25 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4c74a343/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
index 235b927..fb83fd6 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
@@ -131,7 +131,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
                boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0a, M1, 
M2, M3, flagLIX );
 
                //exec-type-specific rewrites
-               if( pn.getExecType() == ExecType.MR || pn.getExecType() == 
ExecType.SPARK )
+               if( pn.getExecType() == getRemoteExecType() )
                {
                        if( M1 > _rm && M3 <= _rm  ) {
                                // rewrite 1: data partitioning (apply 
conditional partitioning)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4c74a343/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index f0eb0e7..0ff7a31 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -262,7 +262,7 @@ public class OptimizerRuleBased extends Optimizer
                boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0a, M1, 
M2, M3, flagLIX );
                
                //exec-type-specific rewrites
-               if( pn.getExecType() == ExecType.MR || 
pn.getExecType()==ExecType.SPARK )
+               if( pn.getExecType() == getRemoteExecType() )
                {
                        if( M1 > _rm && M3 <= _rm  ) {
                                // rewrite 1: data partitioning (apply 
conditional partitioning)
@@ -400,6 +400,10 @@ public class OptimizerRuleBased extends Optimizer
                _rkmax2  = (int) Math.ceil( PAR_K_FACTOR * _rk2 ); 
        }
        
+       protected ExecType getRemoteExecType() {
+               return OptimizerUtils.isSparkExecutionMode() ? ExecType.SPARK : 
ExecType.MR;
+       }
+       
        ///////
        //REWRITE set data partitioner
        ///
@@ -483,7 +487,7 @@ public class OptimizerRuleBased extends Optimizer
                                //NOTE: for the moment, we do not partition 
according to the remote mem, because we can execute 
                                //it even without partitioning in CP. However, 
advanced optimizers should reason about this                                    
    
                                //double mold = h.getMemEstimate();
-                               if(        n.getExecType() == ExecType.MR ||  
n.getExecType()==ExecType.SPARK  //Opt Condition: MR/Spark
+                               if(        n.getExecType() == 
getRemoteExecType()  //Opt Condition: MR/Spark
                                        || h.getMemEstimate() > thetaM ) //Opt 
Condition: mem estimate > constraint to force partitioning       
                                {
                                        //NOTE: subsequent rewrites will still 
use the MR mem estimate
@@ -608,23 +612,22 @@ public class OptimizerRuleBased extends Optimizer
                ParForProgramBlock pfpb = (ParForProgramBlock) o[1];
                
                //search for candidates
-               Collection<OptNode> cand = n.getNodeList(ExecType.MR);
+               Collection<OptNode> cand = n.getNodeList(getRemoteExecType());
                
                //determine if applicable
-               boolean apply =    M < _rm         //ops fit in remote memory 
budget
-                                   && !cand.isEmpty() //at least one MR
-                               && 
isResultPartitionableAll(cand,pfpb.getResultVariables(),vars, 
pfpb.getIterablePredicateVars()[0]); // check candidates
+               boolean apply = M < _rm   //ops fit in remote memory budget
+                       && !cand.isEmpty()    //at least one MR
+                   && isResultPartitionableAll(cand,pfpb.getResultVariables(), 
+                               vars, pfpb.getIterablePredicateVars()[0]); // 
check candidates
                        
                //recompile LIX
                if( apply )
                {
-                       try
-                       {
+                       try {
                                for(OptNode lix : cand)
                                        recompileLIX( lix, vars );
                        }
-                       catch(Exception ex)
-                       {
+                       catch(Exception ex) {
                                throw new DMLRuntimeException("Unable to 
recompile LIX.", ex);
                        }
                }
@@ -827,8 +830,9 @@ public class OptimizerRuleBased extends Optimizer
                boolean isCPOnlyPossible = isCPOnly || isCPOnlyPossible(n, _rm);
 
                String datapartitioner = n.getParam(ParamType.DATA_PARTITIONER);
-               ExecType REMOTE = OptimizerUtils.isSparkExecutionMode() ? 
ExecType.SPARK : ExecType.MR;
-               PDataPartitioner REMOTE_DP = 
OptimizerUtils.isSparkExecutionMode() ? PDataPartitioner.REMOTE_SPARK : 
PDataPartitioner.REMOTE_MR;
+               ExecType REMOTE = getRemoteExecType();
+               PDataPartitioner REMOTE_DP = 
OptimizerUtils.isSparkExecutionMode() ? 
+                       PDataPartitioner.REMOTE_SPARK : 
PDataPartitioner.REMOTE_MR;
 
                //deciding on the execution strategy
                if( ConfigurationManager.isParallelParFor()  //allowed remote 
parfor execution
@@ -906,7 +910,7 @@ public class OptimizerRuleBased extends Optimizer
                ExecType et = n.getExecType();
                boolean ret = ( et == ExecType.CP);             
                
-               if( n.isLeaf() && (et == ExecType.MR || et == ExecType.SPARK) )
+               if( n.isLeaf() && et == getRemoteExecType() )
                {
                        Hop h = 
OptTreeConverter.getAbstractPlanMapping().getMappedHop( n.getID() );
                        if(    h.getForcedExecType()!=LopProperties.ExecType.MR 
 //e.g., -exec=hadoop
@@ -1156,7 +1160,7 @@ public class OptimizerRuleBased extends Optimizer
                                                                
.getAbstractPlanMapping().getMappedProg(n.getID())[1];
                
                //decide on the replication factor 
-               if( n.getExecType()==ExecType.MR || 
n.getExecType()==ExecType.SPARK )           
+               if( n.getExecType()==getRemoteExecType() )              
                {
                        apply = true;
                        
@@ -1417,7 +1421,8 @@ public class OptimizerRuleBased extends Optimizer
                {
                        setTaskPartitioner( pn, PTaskPartitioner.FACTORING_CMAX 
);
                }
-               else if( pn.getExecType()==ExecType.MR && !jvmreuse && 
pn.hasOnlySimpleChilds() )
+               else if( ((pn.getExecType()==ExecType.MR && !jvmreuse) 
+                       || pn.getExecType()==ExecType.SPARK) && 
pn.hasOnlySimpleChilds() )
                {
                        //for simple body programs without loops, branches, or 
function calls, we don't
                        //expect much load imbalance and hence use static 
partitioning in order to
@@ -2931,7 +2936,7 @@ public class OptimizerRuleBased extends Optimizer
                PResultMerge ret = null;
                
                //investigate details of current parfor node
-               boolean flagRemoteParFOR = (n.getExecType() == ExecType.MR || 
n.getExecType() == ExecType.SPARK);
+               boolean flagRemoteParFOR = (n.getExecType() == 
getRemoteExecType());
                boolean flagLargeResult = hasLargeTotalResults( n, 
pfpb.getResultVariables(), vars, true );
                boolean flagRemoteLeftIndexing = hasResultMRLeftIndexing( n, 
pfpb.getResultVariables(), vars, true );
                boolean flagCellFormatWoCompare = 
determineFlagCellFormatWoCompare(pfpb.getResultVariables(), vars); 
@@ -3015,8 +3020,8 @@ public class OptimizerRuleBased extends Optimizer
                {
                        String opName = n.getParam(ParamType.OPSTRING);
                        //check opstring and exec type
-                       if( opName !=null && 
opName.equals(LeftIndexingOp.OPSTRING) && 
-                               (n.getExecType() == ExecType.MR || 
n.getExecType() == ExecType.SPARK) )
+                       if( opName != null && 
opName.equals(LeftIndexingOp.OPSTRING) 
+                               && n.getExecType() == getRemoteExecType() )
                        {
                                LeftIndexingOp hop = (LeftIndexingOp) 
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
                                //check agains set of varname
@@ -3151,7 +3156,7 @@ public class OptimizerRuleBased extends Optimizer
                        if( n.getNodeType() == NodeType.PARFOR )
                        {
                                rewriteSetResultMerge(n, vars, inLocal);
-                               if( n.getExecType()==ExecType.MR || 
n.getExecType()==ExecType.SPARK )
+                               if( n.getExecType()==getRemoteExecType() )
                                        inLocal = false;
                        }
                        else if( n.getChilds()!=null )  
@@ -3493,7 +3498,6 @@ public class OptimizerRuleBased extends Optimizer
                return count;
        }
        
-       
        ////////////////////////
        //   Helper methods   //
        ////////////////////////

Reply via email to