Repository: systemml
Updated Branches:
  refs/heads/master 5837953e9 -> a640e57b0


[SYSTEMML-2152] Fix spark mapmm selection (missing dims constraint)

This patch fixes the operator selection logic for the broadcast-based
mapmm. Since the broadcast matrix is created in the driver, the matrix
needs to satisfy the CP dimensions constraint of int32, which can be
exceeded for ultra-sparse broadcast choices.

In the future, we will also support the creation of broadcast from
blocked RDDs and hdfs files, which will allow to remove the constraint.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/a640e57b
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/a640e57b
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/a640e57b

Branch: refs/heads/master
Commit: a640e57b01ab186f9204ee1a256747dbbfe6ccaa
Parents: 5837953
Author: Matthias Boehm <[email protected]>
Authored: Fri Feb 16 17:17:33 2018 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Fri Feb 16 17:17:33 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/AggBinaryOp.java | 24 +++++++++++---------
 1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/a640e57b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java 
b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
index 59cbf95..241dd86 100644
--- a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
@@ -1650,13 +1650,13 @@ public class AggBinaryOp extends Hop implements 
MultiThreadedHop
        }
        
        private MMultMethod optFindMMultMethodSpark( long m1_rows, long 
m1_cols, long m1_rpb, long m1_cpb, long m1_nnz, 
-            long m2_rows, long m2_cols, long m2_rpb, long m2_cpb, long m2_nnz,
-            MMTSJType mmtsj, ChainType chainType, boolean leftPMInput, boolean 
tmmRewrite ) 
-       {       
+               long m2_rows, long m2_cols, long m2_rpb, long m2_cpb, long 
m2_nnz,
+               MMTSJType mmtsj, ChainType chainType, boolean leftPMInput, 
boolean tmmRewrite ) 
+       {
                //Notes: Any broadcast needs to fit twice in local memory 
because we partition the input in cp,
                //and needs to fit once in executor broadcast memory. The 2GB 
broadcast constraint is no longer
                //required because the max_int byte buffer constraint has been 
fixed in Spark 1.4 
-               double memBudgetExec = MAPMULT_MEM_MULTIPLIER * 
SparkExecutionContext.getBroadcastMemoryBudget();               
+               double memBudgetExec = MAPMULT_MEM_MULTIPLIER * 
SparkExecutionContext.getBroadcastMemoryBudget();
                double memBudgetLocal = OptimizerUtils.getLocalMemBudget();
 
                //reset spark broadcast memory information (for concurrent 
parfor jobs, awareness of additional 
@@ -1699,11 +1699,11 @@ public class AggBinaryOp extends Hop implements 
MultiThreadedHop
                                           + 
OptimizerUtils.estimateSize(m1_cols, m2_cols)) < memBudgetLocal )
                                {
                                        _spBroadcastMemEstimate = 
2*(OptimizerUtils.estimateSize(m1_rows, m2_cols) 
-                                                                               
           + OptimizerUtils.estimateSize(m1_cols, m2_cols));
+                                               + 
OptimizerUtils.estimateSize(m1_cols, m2_cols));
                                        return MMultMethod.MAPMM_CHAIN;
                                }
                        }
-               }               
+               }
                
                // Step 3: check for PMM (permutation matrix needs to fit into 
mapper memory)
                // (needs to be checked before mapmult for consistency with 
removeEmpty compilation 
@@ -1735,11 +1735,13 @@ public class AggBinaryOp extends Hop implements 
MultiThreadedHop
                {
                        //apply map mult if one side fits in remote task memory 
                        //(if so pick smaller input for distributed cache)
-                       if( m1SizeP < m2SizeP && m1_rows>=0 && m1_cols>=0) {
+                       //TODO relax requirement of valid CP dimensions once we 
support broadcast creation from files/RDDs
+                       if( m1SizeP < m2SizeP && m1_rows>=0 && m1_cols>=0
+                               && OptimizerUtils.isValidCPDimensions(m1_rows, 
m1_cols) ) {
                                _spBroadcastMemEstimate = m1Size+m1SizeP;
                                return MMultMethod.MAPMM_L;
                        }
-                       else {
+                       else if( OptimizerUtils.isValidCPDimensions(m2_rows, 
m2_cols) ) {
                                _spBroadcastMemEstimate = m2Size+m2SizeP;
                                return MMultMethod.MAPMM_R;
                        }
@@ -1771,17 +1773,17 @@ public class AggBinaryOp extends Hop implements 
MultiThreadedHop
                // Step 7: check for ZIPMM
                // If t(X)%*%y -> t(t(y)%*%X) rewrite and ncol(X)<blocksize
                if( tmmRewrite && m1_rows >= 0 && m1_rows <= m1_rpb  
//blocksize constraint left
-                       && m2_cols >= 0 && m2_cols <= m2_cpb )           
//blocksize constraint right   
+                       && m2_cols >= 0 && m2_cols <= m2_cpb )           
//blocksize constraint right
                {
                        return MMultMethod.ZIPMM;
                }
-                       
+               
                // Step 8: Decide CPMM vs RMM based on io costs
                //estimate shuffle costs weighted by parallelism
                //TODO currently we reuse the mr estimates, these need to be 
fine-tune for our spark operators
                double rmm_costs = getRMMCostEstimate(m1_rows, m1_cols, m1_rpb, 
m1_cpb, m2_rows, m2_cols, m2_rpb, m2_cpb);
                double cpmm_costs = getCPMMCostEstimate(m1_rows, m1_cols, 
m1_rpb, m1_cpb, m2_rows, m2_cols, m2_rpb, m2_cpb);
-                               
+               
                //final mmult method decision 
                if ( cpmm_costs < rmm_costs ) 
                        return MMultMethod.CPMM;

Reply via email to