[SYSTEMML-2039] Rework instruction generation for memory-efficiency This patch makes a fundamental change to the instruction generation for CP and SPARK execution types. So far, we sorted lops by level and ID leading to a breadth-first instruction scheduling; for large DAGs, this led to an unnecessarily large number of live intermediates, causing OOMs in JMLC, unnecessary evictions in normal execution, and poor data locality. The reason for breadth-first instruction scheduling, was to ensure that transient writes do not overwrite the transient reads and in order to support vertical piggybacking for MR jobs.
The new approach for CP and SPARK uses a simple two-level partitioning, where the lops of a DAG are partitioning into leafs/intermediates and outputs. All leafs/intermediates are scheduled in a depth-first manner, followed by all outputs. There is room for even better scheduling (after the 1.0 release) but this patch aims for minimal changes to the core instruction generation, while removing the main bottleneck. For a complex JMLC application, this patch reduced the maximum memory requirements by more than 3x. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/5d4ad5b8 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/5d4ad5b8 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/5d4ad5b8 Branch: refs/heads/master Commit: 5d4ad5b8e18c14cf552a163057ae8bf3bd4f593e Parents: c965a88 Author: Matthias Boehm <[email protected]> Authored: Thu Dec 7 00:56:47 2017 -0800 Committer: Matthias Boehm <[email protected]> Committed: Thu Dec 7 13:36:14 2017 -0800 ---------------------------------------------------------------------- .../org/apache/sysml/hops/ConvolutionOp.java | 23 +++--- .../java/org/apache/sysml/hops/ReorgOp.java | 60 ++++++---------- .../apache/sysml/lops/ConvolutionTransform.java | 4 ++ src/main/java/org/apache/sysml/lops/Lop.java | 2 +- .../java/org/apache/sysml/lops/Transform.java | 53 ++++++++------ .../java/org/apache/sysml/lops/compile/Dag.java | 76 ++++++++++---------- 6 files changed, 110 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/5d4ad5b8/src/main/java/org/apache/sysml/hops/ConvolutionOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/ConvolutionOp.java b/src/main/java/org/apache/sysml/hops/ConvolutionOp.java index 16a8b63..bdd3fa8 100644 --- a/src/main/java/org/apache/sysml/hops/ConvolutionOp.java +++ b/src/main/java/org/apache/sysml/hops/ConvolutionOp.java @@ -266,10 +266,12 @@ public class ConvolutionOp extends Hop implements MultiThreadedHop // --------------------------------------------------------------- // Contruct the lop - ConvolutionTransform convolutionLop = new ConvolutionTransform(lhsInputLop, lopOp, + Lop optionalMaxPoolOutput = (et == ExecType.GPU) ? getMaxPoolOutputLop() : null; + Lop[] l2inputs = new Lop[inputsOfPotentiallyFusedOp.size()-1]; + for( int i=1; i < inputsOfPotentiallyFusedOp.size(); i++ ) + l2inputs[i-1] = inputsOfPotentiallyFusedOp.get(i).constructLops(); + ConvolutionTransform convolutionLop = new ConvolutionTransform(lhsInputLop, lopOp, getDataType(), getValueType(), et, OptimizerUtils.getConstrainedNumThreads(_maxNumThreads), intermediateMemEstimate); - - // Propagate the output dimensions and the line number of ConvolutionOp to ConvolutionTransform setOutputDimensions(convolutionLop); setLineNumbers(convolutionLop); @@ -280,19 +282,22 @@ public class ConvolutionOp extends Hop implements MultiThreadedHop convolutionLop.addInput(optionalRhsInputLop); optionalRhsInputLop.addOutput(convolutionLop); } - for( int i=1; i < inputsOfPotentiallyFusedOp.size(); i++ ) { - Lop ltmp = inputsOfPotentiallyFusedOp.get(i).constructLops(); - convolutionLop.addInput(ltmp); - ltmp.addOutput(convolutionLop); + for( int i=0; i < l2inputs.length; i++ ) { + convolutionLop.addInput(l2inputs[i]); + l2inputs[i].addOutput(convolutionLop); } // Only valid for MAX_POOLING_BACKWARD on GPU - Lop optionalMaxPoolOutput = (et == ExecType.GPU) ? getMaxPoolOutputLop() : null; if(optionalMaxPoolOutput != null) { convolutionLop.addInput(optionalMaxPoolOutput); optionalMaxPoolOutput.addOutput(convolutionLop); } - convolutionLop.setLevel(); //force order of added lops + convolutionLop.updateLopProperties(); + + // TODO double check that optionalMaxPoolOutput adheres to proper + // ID ordering of constructed lops (previously hidden by setLevel) + // --------------------------------------------------------------- + return convolutionLop; } http://git-wip-us.apache.org/repos/asf/systemml/blob/5d4ad5b8/src/main/java/org/apache/sysml/hops/ReorgOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/ReorgOp.java b/src/main/java/org/apache/sysml/hops/ReorgOp.java index 4d29336..fc85b33 100644 --- a/src/main/java/org/apache/sysml/hops/ReorgOp.java +++ b/src/main/java/org/apache/sysml/hops/ReorgOp.java @@ -226,29 +226,24 @@ public class ReorgOp extends Hop implements MultiThreadedHop } case RESHAPE: { + Lop[] linputs = new Lop[4]; //main, rows, cols, byrow + for( int i=0; i<4; i++ ) + linputs[i] = getInput().get(i).constructLops(); + if( et==ExecType.MR ) { - Transform transform1 = new Transform( getInput().get(0).constructLops(), - HopsTransf2Lops.get(op), getDataType(), getValueType(), et); + Transform transform1 = new Transform( linputs, + HopsTransf2Lops.get(op), getDataType(), getValueType(), et); setOutputDimensions(transform1); setLineNumbers(transform1); - for( int i=1; i<=3; i++ ) //rows, cols, byrow - { - Lop ltmp = getInput().get(i).constructLops(); - transform1.addInput(ltmp); - ltmp.addOutput(transform1); - } - transform1.setLevel(); //force order of added lops - Group group1 = new Group( - transform1, Group.OperationTypes.Sort, DataType.MATRIX, - getValueType()); + Group group1 = new Group(transform1, + Group.OperationTypes.Sort, DataType.MATRIX, getValueType()); setOutputDimensions(group1); setLineNumbers(group1); - Aggregate agg1 = new Aggregate( - group1, Aggregate.OperationTypes.Sum, DataType.MATRIX, - getValueType(), et); + Aggregate agg1 = new Aggregate(group1, + Aggregate.OperationTypes.Sum, DataType.MATRIX, getValueType(), et); setOutputDimensions(agg1); setLineNumbers(agg1); @@ -256,19 +251,11 @@ public class ReorgOp extends Hop implements MultiThreadedHop } else //CP/SPARK { - Transform transform1 = new Transform( getInput().get(0).constructLops(), - HopsTransf2Lops.get(op), getDataType(), getValueType(), et); + Transform transform1 = new Transform( linputs, + HopsTransf2Lops.get(op), getDataType(), getValueType(), et); setOutputDimensions(transform1); setLineNumbers(transform1); - for( int i=1; i<=3; i++ ) //rows, cols, byrow - { - Lop ltmp = getInput().get(i).constructLops(); - transform1.addInput(ltmp); - ltmp.addOutput(transform1); - } - transform1.setLevel(); //force order of added lops - setLops(transform1); } break; @@ -402,23 +389,16 @@ public class ReorgOp extends Hop implements MultiThreadedHop private static Lop constructCPOrSparkSortLop( Hop input, Hop by, Hop desc, Hop ixret, ExecType et, boolean bSortIndInMem ) throws HopsException, LopsException { - Transform transform1 = new Transform( input.constructLops(), HopsTransf2Lops.get(ReOrgOp.SORT), - input.getDataType(), input.getValueType(), et, bSortIndInMem); - - for( Hop c : new Hop[]{by,desc,ixret} ) { - Lop ltmp = c.constructLops(); - transform1.addInput(ltmp); - ltmp.addOutput(transform1); - } - - transform1.setLevel(); //force order of added lops - - return transform1; + Hop[] hinputs = new Hop[]{input, by, desc, ixret}; + Lop[] linputs = new Lop[4]; + for( int i=0; i<4; i++ ) + linputs[i] = hinputs[i].constructLops(); + return new Transform( linputs, HopsTransf2Lops.get(ReOrgOp.SORT), + input.getDataType(), input.getValueType(), et, bSortIndInMem); } - + @Override - protected double computeOutputMemEstimate( long dim1, long dim2, long nnz ) - { + protected double computeOutputMemEstimate( long dim1, long dim2, long nnz ) { //no dedicated mem estimation per op type, because always propagated via refreshSizeInformation double sparsity = OptimizerUtils.getSparsity(dim1, dim2, nnz); return OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sparsity); http://git-wip-us.apache.org/repos/asf/systemml/blob/5d4ad5b8/src/main/java/org/apache/sysml/lops/ConvolutionTransform.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/ConvolutionTransform.java b/src/main/java/org/apache/sysml/lops/ConvolutionTransform.java index 94a67f0..68b3b21 100644 --- a/src/main/java/org/apache/sysml/lops/ConvolutionTransform.java +++ b/src/main/java/org/apache/sysml/lops/ConvolutionTransform.java @@ -101,6 +101,10 @@ public class ConvolutionTransform extends Lop lps.setProperties( inputs, et, ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob ); } } + + public void updateLopProperties() { + lps.setLevel(inputs); + } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/systemml/blob/5d4ad5b8/src/main/java/org/apache/sysml/lops/Lop.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/Lop.java b/src/main/java/org/apache/sysml/lops/Lop.java index 8909ea4..e105362 100644 --- a/src/main/java/org/apache/sysml/lops/Lop.java +++ b/src/main/java/org/apache/sysml/lops/Lop.java @@ -339,7 +339,7 @@ public abstract class Lop return lps.getLevel(); } - public void setLevel() { + protected void setLevel() { lps.setLevel(inputs); } http://git-wip-us.apache.org/repos/asf/systemml/blob/5d4ad5b8/src/main/java/org/apache/sysml/lops/Transform.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/Transform.java b/src/main/java/org/apache/sysml/lops/Transform.java index 919d16c..07cc91e 100644 --- a/src/main/java/org/apache/sysml/lops/Transform.java +++ b/src/main/java/org/apache/sysml/lops/Transform.java @@ -43,44 +43,53 @@ public class Transform extends Lop private OperationTypes operation = null; private boolean _bSortIndInMem = false; private int _numThreads = 1; - - /** - * Constructor when we have one input. - * - * @param input low-level operator - * @param op transform operation type - * @param dt data type - * @param vt value type - * @param et execution type - */ + public Transform(Lop input, Transform.OperationTypes op, DataType dt, ValueType vt, ExecType et) { - this(input, op, dt, vt, et, 1); + this(input, op, dt, vt, et, 1); + } + + public Transform(Lop[] inputs, Transform.OperationTypes op, DataType dt, ValueType vt, ExecType et) { + this(inputs, op, dt, vt, et, 1); } public Transform(Lop input, Transform.OperationTypes op, DataType dt, ValueType vt, ExecType et, int k) { - super(Lop.Type.Transform, dt, vt); - init(input, op, dt, vt, et); + super(Lop.Type.Transform, dt, vt); + init(new Lop[]{input}, op, dt, vt, et); + _numThreads = k; + } + + public Transform(Lop[] inputs, Transform.OperationTypes op, DataType dt, ValueType vt, ExecType et, int k) { + super(Lop.Type.Transform, dt, vt); + init(inputs, op, dt, vt, et); _numThreads = k; } public Transform(Lop input, Transform.OperationTypes op, DataType dt, ValueType vt) { - super(Lop.Type.Transform, dt, vt); - init(input, op, dt, vt, ExecType.MR); + super(Lop.Type.Transform, dt, vt); + init(new Lop[]{input}, op, dt, vt, ExecType.MR); } public Transform(Lop input, Transform.OperationTypes op, DataType dt, ValueType vt, ExecType et, boolean bSortIndInMem) { - super(Lop.Type.Transform, dt, vt); + super(Lop.Type.Transform, dt, vt); + _bSortIndInMem = bSortIndInMem; + init(new Lop[]{input}, op, dt, vt, et); + } + + public Transform(Lop[] inputs, Transform.OperationTypes op, DataType dt, ValueType vt, ExecType et, boolean bSortIndInMem) { + super(Lop.Type.Transform, dt, vt); _bSortIndInMem = bSortIndInMem; - init(input, op, dt, vt, et); + init(inputs, op, dt, vt, et); } - private void init (Lop input, Transform.OperationTypes op, DataType dt, ValueType vt, ExecType et) + private void init (Lop[] input, Transform.OperationTypes op, DataType dt, ValueType vt, ExecType et) { operation = op; - - this.addInput(input); - input.addOutput(this); - + + for(Lop in : input) { + this.addInput(in); + in.addOutput(this); + } + boolean breaksAlignment = true; boolean aligner = false; boolean definesMRJob = false; http://git-wip-us.apache.org/repos/asf/systemml/blob/5d4ad5b8/src/main/java/org/apache/sysml/lops/compile/Dag.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/compile/Dag.java b/src/main/java/org/apache/sysml/lops/compile/Dag.java index 9306a24..0326556 100644 --- a/src/main/java/org/apache/sysml/lops/compile/Dag.java +++ b/src/main/java/org/apache/sysml/lops/compile/Dag.java @@ -22,9 +22,11 @@ package org.apache.sysml.lops.compile; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.stream.Stream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -80,7 +82,6 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.sort.PickFromCompactInputFormat; - /** * * Class to maintain a DAG of lops and compile it into @@ -113,7 +114,7 @@ public class Dag<N extends Lop> var_index = new IDSequence(); } - // hash set for all nodes in dag + // list of all nodes in the dag private ArrayList<Lop> nodes = null; /* @@ -178,11 +179,10 @@ public class Dag<N extends Lop> } public void addLastInstruction(Instruction inst) { lastInstructions.add(inst); - } + } } - public Dag() - { + public Dag() { //allocate internal data structures nodes = new ArrayList<>(); IDMap = new HashMap<>(); @@ -246,25 +246,16 @@ public class Dag<N extends Lop> */ public ArrayList<Instruction> getJobs(StatementBlock sb, DMLConfig config) throws LopsException, IOException, DMLRuntimeException { - - if (config != null) - { + if (config != null) { total_reducers = config.getIntValue(DMLConfig.NUM_REDUCERS); scratch = config.getTextValue(DMLConfig.SCRATCH_SPACE) + "/"; } - // hold all nodes in a vector (needed for ordering) - ArrayList<Lop> node_v = new ArrayList<>(); - node_v.addAll(nodes); - - /* - * Sort the nodes by topological order. - * - * 1) All nodes with level i appear prior to the nodes in level i+1. - * 2) All nodes within a level are ordered by their ID i.e., in the order - * they are created - */ - doTopologicalSort_strict_order(node_v); + // create ordering of lops (for MR, we sort by level, while for all + // other exec types we use a two-level sorting of ) + ArrayList<Lop> node_v = OptimizerUtils.isHadoopExecutionMode() ? + doTopologicalSortStrictOrder(nodes) : + doTopologicalSortTwoLevelOrder(nodes); // do greedy grouping of operations ArrayList<Instruction> inst = doGreedyGrouping(sb, node_v); @@ -3607,14 +3598,16 @@ public class Dag<N extends Lop> } /** - * Method to topologically sort lops + * Sort the lops by topological order. + * + * 1) All nodes with level i appear prior to the nodes in level i+1. + * 2) All nodes within a level are ordered by their ID i.e., in the order + * they are created * * @param v list of lops */ @SuppressWarnings({ "unchecked", "rawtypes" }) - private void doTopologicalSort_strict_order(ArrayList<Lop> v) { - //int numNodes = v.size(); - + private ArrayList<Lop> doTopologicalSortStrictOrder(ArrayList<Lop> v) { /* * Step 1: compute the level for each node in the DAG. Level for each node is * computed as lops are created. So, this step is need not be performed here. @@ -3627,13 +3620,28 @@ public class Dag<N extends Lop> Lop[] nodearray = v.toArray(new Lop[0]); Arrays.sort(nodearray, new LopComparator()); + return createIDMapping(nodearray); + } + + private ArrayList<Lop> doTopologicalSortTwoLevelOrder(ArrayList<Lop> v) { + //partition nodes into leaf/inner nodes and dag root nodes, + //sort leaf/inner nodes by ID to force depth-first scheduling + Lop[] nodearray = Stream.concat( + v.stream().filter(l -> !l.getOutputs().isEmpty()).sorted(Comparator.comparing(l -> l.getID())), + v.stream().filter(l -> l.getOutputs().isEmpty())) + .toArray(Lop[]::new); + + return createIDMapping(nodearray); + } + + private ArrayList<Lop> createIDMapping(Lop[] nodearray) { // Copy sorted nodes into "v" and construct a mapping between Lop IDs and sequence of numbers - v.clear(); + ArrayList<Lop> ret = new ArrayList<>(); IDMap.clear(); for (int i = 0; i < nodearray.length; i++) { - v.add(nodearray[i]); - IDMap.put(v.get(i).getID(), i); + ret.add(nodearray[i]); + IDMap.put(nodearray[i].getID(), i); } /* @@ -3642,17 +3650,14 @@ public class Dag<N extends Lop> * - and construct the list of reachable nodes from the node $u$ * - store the constructed reachability information in $u$.reachable[] boolean array */ - // - // for (int i = 0; i < nodearray.length; i++) { - boolean[] arr = v.get(i).create_reachable(nodearray.length); - Arrays.fill(arr, false); - dagDFS(v.get(i), arr); + dagDFS(nodearray[i], nodearray[i] + .create_reachable(nodearray.length)); } // print the nodes in sorted order if (LOG.isTraceEnabled()) { - for ( Lop vnode : v ) { + for ( Lop vnode : ret ) { StringBuilder sb = new StringBuilder(); sb.append(vnode.getID()); sb.append("("); @@ -3665,13 +3670,12 @@ public class Dag<N extends Lop> sb.append(","); } sb.append("), "); - LOG.trace(sb.toString()); } - LOG.trace("topological sort -- done"); } - + + return ret; } /**
