Repository: systemml
Updated Branches:
  refs/heads/master 09b1533de -> 4c7640b87


[SYSTEMML-1349,2251] Extended parfor optimizer for shared reads

This patch resolves a long standing shortcoming of the parfor optimizer,
of setting the degree of parallelism too low because shared reads were
not handled properly and thus double counted for each worker. Especially
on modern processors with increasing number of cores this became more
important to reduce unnecessary barriers (a too low parfor degree of
parallelism leads to distributing the remaining par to included parfor
loops and operations which require barriers per operation).
 

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/9a089151
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/9a089151
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/9a089151

Branch: refs/heads/master
Commit: 9a089151bf2bc518fe9eaf5b3fe9505f237dc399
Parents: 09b1533
Author: Matthias Boehm <[email protected]>
Authored: Thu Apr 19 18:57:51 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Thu Apr 19 18:57:51 2018 -0700

----------------------------------------------------------------------
 src/main/java/org/apache/sysml/hops/Hop.java    | 48 ++++++++--------
 .../parfor/opt/CostEstimator.java               | 22 ++++---
 .../parfor/opt/CostEstimatorHops.java           |  9 ++-
 .../parfor/opt/OptimizerConstrained.java        |  8 +--
 .../parfor/opt/OptimizerRuleBased.java          | 60 +++++++++++++-------
 5 files changed, 86 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/9a089151/src/main/java/org/apache/sysml/hops/Hop.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/Hop.java 
b/src/main/java/org/apache/sysml/hops/Hop.java
index 9445220..059132d 100644
--- a/src/main/java/org/apache/sysml/hops/Hop.java
+++ b/src/main/java/org/apache/sysml/hops/Hop.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.hops;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 
@@ -459,6 +460,19 @@ public abstract class Hop implements ParseInfo
                return _outputEmptyBlocks;
        }
        
+
+       protected double getInputOutputSize() {
+               return _outputMemEstimate
+                       + _processingMemEstimate
+                       + getInputSize();
+       }
+       
+       public double getInputOutputSize(Collection<String> exclVars) {
+               return _outputMemEstimate
+                       + _processingMemEstimate
+                       + getInputSize(exclVars);
+       }
+       
        /**
         * Returns the memory estimate for the output produced from this Hop.
         * It must be invoked only within Hops. From outside Hops, one must 
@@ -467,21 +481,22 @@ public abstract class Hop implements ParseInfo
         * 
         * @return output size memory estimate
         */
-       protected double getOutputSize() 
-       {
+       protected double getOutputSize() {
                return _outputMemEstimate;
        }
+       
+       protected double getInputSize() {
+               return getInputSize(null);
+       }
 
-       protected double getInputSize() 
-       {
-               double sum = 0;         
+       protected double getInputSize(Collection<String> exclVars) {
+               double sum = 0;
                int len = _input.size();
-               
-               for( int i=0; i<len; i++ ) //for all inputs
-               {
+               for( int i=0; i<len; i++ ) { //for all inputs
                        Hop hi = _input.get(i);
+                       if( exclVars != null && exclVars.contains(hi.getName()) 
)
+                               continue;
                        double hmout = hi.getOutputMemEstimate();
-                       
                        if( hmout > 1024*1024 ) {//for relevant sizes
                                //check if already included in estimate (if an 
input is used
                                //multiple times it is still only required once 
in memory)
@@ -491,24 +506,9 @@ public abstract class Hop implements ParseInfo
                                        flag |= (hi == _input.get(j));
                                hmout = flag ? 0 : hmout;
                        }
-                       
                        sum += hmout;
                }
                
-               //for(Hop h : _input ) {
-               //      sum += h._outputMemEstimate;
-               //}
-               
-               return sum;
-       }
-
-       protected double getInputOutputSize() 
-       {
-               double sum = 0;
-               sum += _outputMemEstimate;
-               sum += _processingMemEstimate;
-               sum += getInputSize();
-               
                return sum;
        }
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a089151/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java
index 167120f..c726308 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java
@@ -26,7 +26,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.sysml.lops.LopProperties.ExecType;
-import org.apache.sysml.parser.ParForStatementBlock.ResultVar;
 import org.apache.sysml.runtime.controlprogram.parfor.opt.OptNode.ParamType;
 
 /**
@@ -47,17 +46,20 @@ public abstract class CostEstimator
        public static final double DEFAULT_MEM_ESTIMATE_MR = 20*1024*1024; 
//default memory consumption: 20MB 
 
        public enum TestMeasure {
-               EXEC_TIME,
-               MEMORY_USAGE
+               EXEC_TIME, MEMORY_USAGE
        }
        
        public enum DataFormat {
-               DENSE,
-               SPARSE
+               DENSE, SPARSE
+       }
+       
+       public enum ExcludeType {
+               NONE, SHARED_READ, RESULT_LIX
        }
        
        protected boolean _inclCondPart = false;
-       protected Collection<ResultVar> _exclRetVars = null;
+       protected Collection<String> _exclVars = null;
+       protected ExcludeType _exclType = ExcludeType.NONE;
        
        /**
         * Main leaf node estimation method - to be overwritten by specific 
cost estimators
@@ -102,12 +104,14 @@ public abstract class CostEstimator
                return val;
        }
        
-       public double getEstimate(TestMeasure measure, OptNode node, boolean 
inclCondPart, Collection<ResultVar> retVars) {
+       public double getEstimate(TestMeasure measure, OptNode node, boolean 
inclCondPart, Collection<String> vars, ExcludeType extype) {
                _inclCondPart = inclCondPart; //temporary
-               _exclRetVars = retVars;
+               _exclVars = vars;
+               _exclType = extype;
                double val = getEstimate(measure, node, null);
                _inclCondPart = false; 
-               _exclRetVars = null;
+               _exclVars = null;
+               _exclType = ExcludeType.NONE;
                return val;
        }
        

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a089151/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorHops.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorHops.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorHops.java
index 3cb0c6f..4c3e7f0 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorHops.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimatorHops.java
@@ -24,7 +24,6 @@ import org.apache.sysml.hops.Hop;
 import org.apache.sysml.hops.LeftIndexingOp;
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.lops.LopProperties.ExecType;
-import org.apache.sysml.parser.ParForStatementBlock.ResultVar;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.parfor.opt.OptNode.NodeType;
 import 
org.apache.sysml.runtime.controlprogram.parfor.opt.Optimizer.CostModelType;
@@ -61,6 +60,10 @@ public class CostEstimatorHops extends CostEstimator
                Hop h = _map.getMappedHop( node.getID() );
                double value = h.getMemEstimate();
                
+               //correction for disabled shared read accounting
+               value = (_exclVars!=null && _exclType==ExcludeType.SHARED_READ) 
?
+                       h.getInputOutputSize(_exclVars) : value;
+               
                //handle specific cases 
                double DEFAULT_MEM_REMOTE = 
OptimizerUtils.isSparkExecutionMode() ? 
                        DEFAULT_MEM_SP : DEFAULT_MEM_MR;
@@ -100,8 +103,8 @@ public class CostEstimatorHops extends CostEstimator
                }
                
                //correction for disabled result indexing
-               value = (_exclRetVars!=null && h instanceof LeftIndexingOp
-                       && ResultVar.contains(_exclRetVars, h.getName())) ? 0 : 
value;
+               value = (_exclVars!=null && _exclType==ExcludeType.RESULT_LIX 
+                       && h instanceof LeftIndexingOp && 
_exclVars.contains(h.getName())) ? 0 : value;
                
                if( LOG.isTraceEnabled() ) {
                        LOG.trace("Memory estimate "+h.getName()+", 
"+h.getOpString()

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a089151/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
index 4088e63..deedf29 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
@@ -152,7 +152,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
                        super.rewriteSetExportReplicationFactor( pn, 
ec.getVariables() );
 
                        // rewrite 10: determine parallelism
-                       rewriteSetDegreeOfParallelism( pn, M1, false );
+                       rewriteSetDegreeOfParallelism( pn, _cost, 
ec.getVariables(), M1, false );
 
                        // rewrite 11: task partitioning 
                        rewriteSetTaskPartitioner( pn, false, flagLIX );
@@ -174,7 +174,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
                else //if( pn.getExecType() == ExecType.CP )
                {
                        // rewrite 10: determine parallelism
-                       rewriteSetDegreeOfParallelism( pn, M1, false );
+                       rewriteSetDegreeOfParallelism( pn, _cost, 
ec.getVariables(), M1, false );
 
                        // rewrite 11: task partitioning
                        rewriteSetTaskPartitioner( pn, false, false ); 
//flagLIX always false 
@@ -282,7 +282,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
        ///
 
        @Override
-       protected void rewriteSetDegreeOfParallelism(OptNode n, double M, 
boolean flagNested) {
+       protected void rewriteSetDegreeOfParallelism(OptNode n, CostEstimator 
cost, LocalVariableMap vars, double M, boolean flagNested) {
                // constraint awareness
                if( n.getK() > 0 && ConfigurationManager.isParallelParFor() )
                {
@@ -299,7 +299,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
                        LOG.debug(getOptMode()+" OPT: forced 'set degree of 
parallelism' - result=(see EXPLAIN)" );
                }
                else
-                       super.rewriteSetDegreeOfParallelism(n, M, flagNested);
+                       super.rewriteSetDegreeOfParallelism(n, cost, vars, M, 
flagNested);
        }
 
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a089151/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 2bf8fe5..91124a0 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -81,6 +82,7 @@ import 
org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
 import org.apache.sysml.runtime.controlprogram.parfor.ResultMergeLocalFile;
+import 
org.apache.sysml.runtime.controlprogram.parfor.opt.CostEstimator.ExcludeType;
 import 
org.apache.sysml.runtime.controlprogram.parfor.opt.CostEstimator.TestMeasure;
 import org.apache.sysml.runtime.controlprogram.parfor.opt.OptNode.ExecType;
 import org.apache.sysml.runtime.controlprogram.parfor.opt.OptNode.NodeType;
@@ -271,7 +273,7 @@ public class OptimizerRuleBased extends Optimizer
                        rewriteSetExportReplicationFactor( pn, 
ec.getVariables() );
                        
                        // rewrite 10: determine parallelism
-                       rewriteSetDegreeOfParallelism( pn, M1, false );
+                       rewriteSetDegreeOfParallelism( pn, _cost, 
ec.getVariables(), M1, false );
                        
                        // rewrite 11: task partitioning 
                        rewriteSetTaskPartitioner( pn, false, flagLIX );
@@ -292,7 +294,7 @@ public class OptimizerRuleBased extends Optimizer
                else //if( pn.getExecType() == ExecType.CP )
                {
                        // rewrite 10: determine parallelism
-                       rewriteSetDegreeOfParallelism( pn, M1, false );
+                       rewriteSetDegreeOfParallelism( pn, _cost, 
ec.getVariables(), M1, false );
                        
                        // rewrite 11: task partitioning
                        rewriteSetTaskPartitioner( pn, false, false ); 
//flagLIX always false 
@@ -1173,25 +1175,36 @@ public class OptimizerRuleBased extends Optimizer
        //REWRITE set degree of parallelism
        ///
 
-       protected void rewriteSetDegreeOfParallelism(OptNode n, double M, 
boolean flagNested) 
+       protected void rewriteSetDegreeOfParallelism(OptNode n, CostEstimator 
cost, LocalVariableMap vars, double M, boolean flagNested) 
        {
                ExecType type = n.getExecType();
                long id = n.getID();
-                               
+               
                //special handling for different exec models (CP, MR, MR nested)
-               ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
-                                                                               
.getAbstractPlanMapping().getMappedProg(id)[1];
+               Object[] map = 
OptTreeConverter.getAbstractPlanMapping().getMappedProg(id);
+               ParForStatementBlock pfsb = (ParForStatementBlock)map[0];
+               ParForProgramBlock pfpb = (ParForProgramBlock)map[1];
                
                if( type == ExecType.CP ) 
                {
                        //determine local max parallelism constraint
                        int kMax = ConfigurationManager.isParallelParFor() ?
-                                       (n.isCPOnly() ? _lkmaxCP : _lkmaxMR) : 
1;
+                               (n.isCPOnly() ? _lkmaxCP : _lkmaxMR) : 1;
+                       
+                       //compute memory budgets and partial estimates for 
handling shared reads
+                       double mem = (OptimizerUtils.isSparkExecutionMode() && 
!n.isCPOnly()) ? _lm/2 : _lm;
+                       double sharedM = 0, nonSharedM = M;
+                       if( computeMaxK(M, M, 0, mem) < kMax ) { //account for 
shared read if necessary
+                               sharedM = 
pfsb.getReadOnlyParentVars().stream().map(s -> vars.get(s))
+                                       .filter(d -> d instanceof 
MatrixObject).mapToDouble(mo -> OptimizerUtils
+                                       
.estimateSize(((MatrixObject)mo).getMatrixCharacteristics())).sum();
+                               nonSharedM = 
cost.getEstimate(TestMeasure.MEMORY_USAGE, n, true,
+                                       pfsb.getReadOnlyParentVars(), 
ExcludeType.SHARED_READ);
+                       }
                        
                        //ensure local memory constraint (for spark more 
conservative in order to 
                        //prevent unnecessary guarded collect)
-                       double mem = (OptimizerUtils.isSparkExecutionMode() && 
!n.isCPOnly()) ? _lm/2 : _lm;
-                       kMax = Math.min( kMax, (int)Math.floor( mem / M ) );
+                       kMax = Math.min( kMax, computeMaxK(M, nonSharedM, 
sharedM, mem) );
                        kMax = Math.max( kMax, 1);
                        
                        //constrain max parfor parallelism by problem size
@@ -1226,21 +1239,17 @@ public class OptimizerRuleBased extends Optimizer
                else // ExecType.MR/ExecType.SPARK
                {
                        int kMax = -1;
-                       if( flagNested )
-                       {
+                       if( flagNested ) {
                                //determine remote max parallelism constraint
                                pfpb.setDegreeOfParallelism( _rnk ); 
//guaranteed <= _N (see nested)
-                               n.setK( _rnk ); 
-                       
+                               n.setK( _rnk );
                                kMax = _rkmax / _rnk; //per node (CP only 
inside)
                        }
-                       else //not nested (default)
-                       {
+                       else { //not nested (default)
                                //determine remote max parallelism constraint
                                int tmpK = (int)((_N<_rk)? _N : _rk);
                                pfpb.setDegreeOfParallelism(tmpK);
-                               n.setK(tmpK);   
-                               
+                               n.setK(tmpK);
                                kMax = _rkmax / tmpK; //per node (CP only 
inside)
                        }
                        
@@ -1252,14 +1261,22 @@ public class OptimizerRuleBased extends Optimizer
                        //disable nested parallelism, if required
                        if( !ALLOW_REMOTE_NESTED_PARALLELISM )
                                kMax = 1;
-                                       
+                       
                        //distribute remaining parallelism and recompile 
parallel instructions
-                       rAssignRemainingParallelism( n, kMax, 1 ); 
-               }               
+                       rAssignRemainingParallelism( n, kMax, 1 );
+               }
                
                _numEvaluatedPlans++;
                LOG.debug(getOptMode()+" OPT: rewrite 'set degree of 
parallelism' - result=(see EXPLAIN)" );
        }
+       
+       private int computeMaxK(double M, double memNonShared, double 
memShared, double memBudget) {
+               //note: we compute max K for both w/o and w/ shared reads and 
take the max, because
+               //the latter might reduce the degree of parallelism if shared 
reads don't dominate
+               int k1 = (int)Math.floor(memBudget / M);
+               int k2 = (int)Math.floor(memBudget-memShared / memNonShared);
+               return Math.max(k1, k2);
+       }
 
        protected void rAssignRemainingParallelism(OptNode n, int parforK, int 
opsK) 
        {
@@ -1644,7 +1661,8 @@ public class OptimizerRuleBased extends Optimizer
                        double sum = computeTotalSizeResultVariables(retVars, 
vars, pfpb.getDegreeOfParallelism());
                
                        //compute memory estimate without result indexing, and 
total sum per worker
-                       double M = cost.getEstimate(TestMeasure.MEMORY_USAGE, 
pn, true, retVars);
+                       double M = cost.getEstimate(TestMeasure.MEMORY_USAGE, 
pn, true, retVars.stream()
+                               .map(var -> 
var._name).collect(Collectors.toList()), ExcludeType.RESULT_LIX);
                        totalMem = M + sum;
                        
                        //result update in-place for MR/Spark (w/ remote memory 
constraint)

Reply via email to