Repository: incubator-systemml
Updated Branches:
  refs/heads/master af5be9b2e -> b2be71738


[SYSTEMML-1261] Improved transitive Spark exec type selection ba/ua

This patch improves the transitive execution type selection of aggregate
binary and aggregate unary operations (i.e., pulling operations that
would fit in CP but whose inputs are executed in SPARK, transitively
into SPARK as well). The major benefits are reduced memory pressure on
the driver and reduced data transfer between executors and driver.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b2be7173
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b2be7173
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b2be7173

Branch: refs/heads/master
Commit: b2be717382ee90de2d8c0a6152faa8bb0651bd41
Parents: af5be9b
Author: Matthias Boehm <[email protected]>
Authored: Tue Feb 14 01:07:19 2017 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Tue Feb 14 11:17:30 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/AggBinaryOp.java | 25 +++++++++++++-------
 .../java/org/apache/sysml/hops/AggUnaryOp.java  |  3 ++-
 2 files changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2be7173/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java 
b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
index 67ea1ad..fe8a30d 100644
--- a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
@@ -437,17 +437,12 @@ public class AggBinaryOp extends Hop implements 
MultiThreadedHop
                        checkAndSetInvalidCPDimsAndSize();
                }
                
-               //spark-specific decision refinement (execute binary aggregate 
w/ left spark input and 
+               //spark-specific decision refinement (execute binary aggregate 
w/ left or right spark input and 
                //single parent also in spark because it's likely cheap and 
reduces data transfer)
-               if( _etype == ExecType.CP && _etypeForced != ExecType.CP        
-                       && !(getInput().get(0) instanceof ReorgOp && 
((ReorgOp)getInput().get(0)).getOp()==ReOrgOp.TRANSPOSE)
-                       && !(getInput().get(0) instanceof DataOp)  //input is 
not checkpoint
-                       && getInput().get(0).getParent().size()==1 //bagg is 
only parent        
-                       && !getInput().get(0).areDimsBelowThreshold() 
-                       && getInput().get(0).optFindExecType() == ExecType.SPARK
-                       && 
getInput().get(0).getOutputMemEstimate()>getOutputMemEstimate() )    
+               if( _etype == ExecType.CP && _etypeForced != ExecType.CP &&
+                       (isApplicableForTransitiveSparkExecType(true) || 
isApplicableForTransitiveSparkExecType(false)) )    
                {
-                       //pull unary aggregate into spark 
+                       //pull binary aggregate into spark 
                        _etype = ExecType.SPARK;
                }
                
@@ -459,6 +454,18 @@ public class AggBinaryOp extends Hop implements 
MultiThreadedHop
                return _etype;
        }
        
+       private boolean isApplicableForTransitiveSparkExecType(boolean left) 
+               throws HopsException 
+       {
+               int index = left ? 0 : 1;
+               return !(getInput().get(index) instanceof DataOp && 
((DataOp)getInput().get(index)).requiresCheckpoint())  
+                       && !(getInput().get(index) instanceof ReorgOp && 
((ReorgOp)getInput().get(index)).getOp()==ReOrgOp.TRANSPOSE)
+                       && getInput().get(index).getParent().size()==1 //bagg 
is only parent    
+                       && !getInput().get(index).areDimsBelowThreshold() 
+                       && getInput().get(index).optFindExecType() == 
ExecType.SPARK
+                       && 
getInput().get(index).getOutputMemEstimate()>getOutputMemEstimate();
+       }
+       
        /**
         * TSMM: Determine if XtX pattern applies for this aggbinary and if yes
         * which type. 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2be7173/src/main/java/org/apache/sysml/hops/AggUnaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/AggUnaryOp.java 
b/src/main/java/org/apache/sysml/hops/AggUnaryOp.java
index 23e1da4..9964981 100644
--- a/src/main/java/org/apache/sysml/hops/AggUnaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/AggUnaryOp.java
@@ -448,7 +448,8 @@ public class AggUnaryOp extends Hop implements 
MultiThreadedHop
                //single parent also in spark because it's likely cheap and 
reduces data transfer)
                if( _etype == ExecType.CP && _etypeForced != ExecType.CP
                        && !(getInput().get(0) instanceof DataOp)  //input is 
not checkpoint
-                       && getInput().get(0).getParent().size()==1 //uagg is 
only parent
+                       && (getInput().get(0).getParent().size()==1 //uagg is 
only parent, or 
+                          || !requiresAggregation(getInput().get(0), 
_direction)) //w/o agg
                        && getInput().get(0).optFindExecType() == 
ExecType.SPARK )                                      
                {
                        //pull unary aggregate into spark 

Reply via email to