Repository: incubator-systemml Updated Branches: refs/heads/master af5be9b2e -> b2be71738
[SYSTEMML-1261] Improved transitive Spark exec type selection ba/ua This patch improves the transitive execution type selection of aggregate binary and aggregate unary operations (i.e., pulling operations that would fit in CP but whose inputs are executed in SPARK, transitively into SPARK as well). The major benefits are reduced memory pressure on the driver and reduced data transfer between executors and driver. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b2be7173 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b2be7173 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b2be7173 Branch: refs/heads/master Commit: b2be717382ee90de2d8c0a6152faa8bb0651bd41 Parents: af5be9b Author: Matthias Boehm <[email protected]> Authored: Tue Feb 14 01:07:19 2017 -0800 Committer: Matthias Boehm <[email protected]> Committed: Tue Feb 14 11:17:30 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/sysml/hops/AggBinaryOp.java | 25 +++++++++++++------- .../java/org/apache/sysml/hops/AggUnaryOp.java | 3 ++- 2 files changed, 18 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2be7173/src/main/java/org/apache/sysml/hops/AggBinaryOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java index 67ea1ad..fe8a30d 100644 --- a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java +++ b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java @@ -437,17 +437,12 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop checkAndSetInvalidCPDimsAndSize(); } - //spark-specific decision refinement (execute binary aggregate w/ left spark input and + //spark-specific decision refinement (execute binary aggregate w/ left or right spark input and //single parent also in spark because it's likely cheap and reduces data transfer) - if( _etype == ExecType.CP && _etypeForced != ExecType.CP - && !(getInput().get(0) instanceof ReorgOp && ((ReorgOp)getInput().get(0)).getOp()==ReOrgOp.TRANSPOSE) - && !(getInput().get(0) instanceof DataOp) //input is not checkpoint - && getInput().get(0).getParent().size()==1 //bagg is only parent - && !getInput().get(0).areDimsBelowThreshold() - && getInput().get(0).optFindExecType() == ExecType.SPARK - && getInput().get(0).getOutputMemEstimate()>getOutputMemEstimate() ) + if( _etype == ExecType.CP && _etypeForced != ExecType.CP && + (isApplicableForTransitiveSparkExecType(true) || isApplicableForTransitiveSparkExecType(false)) ) { - //pull unary aggregate into spark + //pull binary aggregate into spark _etype = ExecType.SPARK; } @@ -459,6 +454,18 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop return _etype; } + private boolean isApplicableForTransitiveSparkExecType(boolean left) + throws HopsException + { + int index = left ? 0 : 1; + return !(getInput().get(index) instanceof DataOp && ((DataOp)getInput().get(index)).requiresCheckpoint()) + && !(getInput().get(index) instanceof ReorgOp && ((ReorgOp)getInput().get(index)).getOp()==ReOrgOp.TRANSPOSE) + && getInput().get(index).getParent().size()==1 //bagg is only parent + && !getInput().get(index).areDimsBelowThreshold() + && getInput().get(index).optFindExecType() == ExecType.SPARK + && getInput().get(index).getOutputMemEstimate()>getOutputMemEstimate(); + } + /** * TSMM: Determine if XtX pattern applies for this aggbinary and if yes * which type. http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2be7173/src/main/java/org/apache/sysml/hops/AggUnaryOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/AggUnaryOp.java b/src/main/java/org/apache/sysml/hops/AggUnaryOp.java index 23e1da4..9964981 100644 --- a/src/main/java/org/apache/sysml/hops/AggUnaryOp.java +++ b/src/main/java/org/apache/sysml/hops/AggUnaryOp.java @@ -448,7 +448,8 @@ public class AggUnaryOp extends Hop implements MultiThreadedHop //single parent also in spark because it's likely cheap and reduces data transfer) if( _etype == ExecType.CP && _etypeForced != ExecType.CP && !(getInput().get(0) instanceof DataOp) //input is not checkpoint - && getInput().get(0).getParent().size()==1 //uagg is only parent + && (getInput().get(0).getParent().size()==1 //uagg is only parent, or + || !requiresAggregation(getInput().get(0), _direction)) //w/o agg && getInput().get(0).optFindExecType() == ExecType.SPARK ) { //pull unary aggregate into spark
