Repository: systemml Updated Branches: refs/heads/master 4416b5e51 -> aefab8f8c
[SYSTEMML-2030] Improved transitive exec type selection spark mm This patch improves the transitive execution type selection of spark matrix multiply for cases where the input is produced by a spark transpose operation and the transpose-mm rewrite is not applicable due to memory constraints. On the perftest L2SVM 800GB sparse icpt 1, this patch improved performance from 2,420s to 223s. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/fbec4795 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/fbec4795 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/fbec4795 Branch: refs/heads/master Commit: fbec47952122fb3ffd1193568035daa052618102 Parents: 4416b5e Author: Matthias Boehm <mboe...@gmail.com> Authored: Wed Nov 29 16:24:46 2017 -0800 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Wed Nov 29 19:11:52 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/sysml/hops/AggBinaryOp.java | 54 ++++++++------------ 1 file changed, 21 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/fbec4795/src/main/java/org/apache/sysml/hops/AggBinaryOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java index d733d6a..2ff432b 100644 --- a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java +++ b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java @@ -431,14 +431,14 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop } @Override - protected ExecType optFindExecType() + protected ExecType optFindExecType() throws HopsException { checkAndSetForcedPlatform(); ExecType REMOTE = OptimizerUtils.isSparkExecutionMode() ? ExecType.SPARK : ExecType.MR; - if( _etypeForced != null ) + if( _etypeForced != null ) { _etype = _etypeForced; } @@ -472,8 +472,9 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop //spark-specific decision refinement (execute binary aggregate w/ left or right spark input and //single parent also in spark because it's likely cheap and reduces data transfer) - if( _etype == ExecType.CP && _etypeForced != ExecType.CP && - (isApplicableForTransitiveSparkExecType(true) || isApplicableForTransitiveSparkExecType(false)) ) + if( _etype == ExecType.CP && _etypeForced != ExecType.CP + && (isApplicableForTransitiveSparkExecType(true) + || isApplicableForTransitiveSparkExecType(false)) ) { //pull binary aggregate into spark _etype = ExecType.SPARK; @@ -489,9 +490,10 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop throws HopsException { int index = left ? 0 : 1; - return !(getInput().get(index) instanceof DataOp && ((DataOp)getInput().get(index)).requiresCheckpoint()) - && !HopRewriteUtils.isTransposeOperation(getInput().get(index)) - && getInput().get(index).getParent().size()==1 //bagg is only parent + return !(getInput().get(index) instanceof DataOp && ((DataOp)getInput().get(index)).requiresCheckpoint()) + && (!HopRewriteUtils.isTransposeOperation(getInput().get(index)) + || (left && !isLeftTransposeRewriteApplicable(true, false))) + && getInput().get(index).getParent().size()==1 //bagg is only parent && !getInput().get(index).areDimsBelowThreshold() && getInput().get(index).optFindExecType() == ExecType.SPARK && getInput().get(index).getOutputMemEstimate()>getOutputMemEstimate(); @@ -660,35 +662,21 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop private void constructCPLopsMM(ExecType et) throws HopsException, LopsException - { + { Lop matmultCP = null; - + if (et == ExecType.GPU) { Hop h1 = getInput().get(0); Hop h2 = getInput().get(1); - Lop left; Lop right; - boolean isLeftTransposed; boolean isRightTransposed; - if( HopRewriteUtils.isTransposeOperation(h1) ) { - isLeftTransposed = true; - left = h1.getInput().get(0).constructLops(); - } - else { - isLeftTransposed = false; - left = h1.constructLops(); - } - if( HopRewriteUtils.isTransposeOperation(h2) ) { - isRightTransposed = true; - right = h2.getInput().get(0).constructLops(); - } - else { - isRightTransposed = false; - right = h2.constructLops(); - } - - matmultCP = new Binary(left, right, - Binary.OperationTypes.MATMULT, getDataType(), getValueType(), et, isLeftTransposed, isRightTransposed); + boolean leftTrans = HopRewriteUtils.isTransposeOperation(h1); + boolean rightTrans = HopRewriteUtils.isTransposeOperation(h1); + Lop left = !leftTrans ? h1.constructLops() : + h1.getInput().get(0).constructLops(); + Lop right = !rightTrans ? h2.constructLops() : + h2.getInput().get(0).constructLops(); + matmultCP = new Binary(left, right, Binary.OperationTypes.MATMULT, + getDataType(), getValueType(), et, leftTrans, rightTrans); setOutputDimensions(matmultCP); - setNnz(-1); } else { if( isLeftTransposeRewriteApplicable(true, false) ) { @@ -696,8 +684,8 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop } else { int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads); - matmultCP = new Binary(getInput().get(0).constructLops(),getInput().get(1).constructLops(), - Binary.OperationTypes.MATMULT, getDataType(), getValueType(), et, k); + matmultCP = new Binary(getInput().get(0).constructLops(),getInput().get(1).constructLops(), + Binary.OperationTypes.MATMULT, getDataType(), getValueType(), et, k); } setOutputDimensions(matmultCP); }