Repository: systemml Updated Branches: refs/heads/master 5837953e9 -> a640e57b0
[SYSTEMML-2152] Fix spark mapmm selection (missing dims constraint) This patch fixes the operator selection logic for the broadcast-based mapmm. Since the broadcast matrix is created in the driver, the matrix needs to satisfy the CP dimensions constraint of int32, which can be exceeded for ultra-sparse broadcast choices. In the future, we will also support the creation of broadcast from blocked RDDs and hdfs files, which will allow to remove the constraint. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/a640e57b Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/a640e57b Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/a640e57b Branch: refs/heads/master Commit: a640e57b01ab186f9204ee1a256747dbbfe6ccaa Parents: 5837953 Author: Matthias Boehm <[email protected]> Authored: Fri Feb 16 17:17:33 2018 -0800 Committer: Matthias Boehm <[email protected]> Committed: Fri Feb 16 17:17:33 2018 -0800 ---------------------------------------------------------------------- .../java/org/apache/sysml/hops/AggBinaryOp.java | 24 +++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/a640e57b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java index 59cbf95..241dd86 100644 --- a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java +++ b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java @@ -1650,13 +1650,13 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop } private MMultMethod optFindMMultMethodSpark( long m1_rows, long m1_cols, long m1_rpb, long m1_cpb, long m1_nnz, - long m2_rows, long m2_cols, long m2_rpb, long m2_cpb, long m2_nnz, - MMTSJType mmtsj, ChainType chainType, boolean leftPMInput, boolean tmmRewrite ) - { + long m2_rows, long m2_cols, long m2_rpb, long m2_cpb, long m2_nnz, + MMTSJType mmtsj, ChainType chainType, boolean leftPMInput, boolean tmmRewrite ) + { //Notes: Any broadcast needs to fit twice in local memory because we partition the input in cp, //and needs to fit once in executor broadcast memory. The 2GB broadcast constraint is no longer //required because the max_int byte buffer constraint has been fixed in Spark 1.4 - double memBudgetExec = MAPMULT_MEM_MULTIPLIER * SparkExecutionContext.getBroadcastMemoryBudget(); + double memBudgetExec = MAPMULT_MEM_MULTIPLIER * SparkExecutionContext.getBroadcastMemoryBudget(); double memBudgetLocal = OptimizerUtils.getLocalMemBudget(); //reset spark broadcast memory information (for concurrent parfor jobs, awareness of additional @@ -1699,11 +1699,11 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop + OptimizerUtils.estimateSize(m1_cols, m2_cols)) < memBudgetLocal ) { _spBroadcastMemEstimate = 2*(OptimizerUtils.estimateSize(m1_rows, m2_cols) - + OptimizerUtils.estimateSize(m1_cols, m2_cols)); + + OptimizerUtils.estimateSize(m1_cols, m2_cols)); return MMultMethod.MAPMM_CHAIN; } } - } + } // Step 3: check for PMM (permutation matrix needs to fit into mapper memory) // (needs to be checked before mapmult for consistency with removeEmpty compilation @@ -1735,11 +1735,13 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop { //apply map mult if one side fits in remote task memory //(if so pick smaller input for distributed cache) - if( m1SizeP < m2SizeP && m1_rows>=0 && m1_cols>=0) { + //TODO relax requirement of valid CP dimensions once we support broadcast creation from files/RDDs + if( m1SizeP < m2SizeP && m1_rows>=0 && m1_cols>=0 + && OptimizerUtils.isValidCPDimensions(m1_rows, m1_cols) ) { _spBroadcastMemEstimate = m1Size+m1SizeP; return MMultMethod.MAPMM_L; } - else { + else if( OptimizerUtils.isValidCPDimensions(m2_rows, m2_cols) ) { _spBroadcastMemEstimate = m2Size+m2SizeP; return MMultMethod.MAPMM_R; } @@ -1771,17 +1773,17 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop // Step 7: check for ZIPMM // If t(X)%*%y -> t(t(y)%*%X) rewrite and ncol(X)<blocksize if( tmmRewrite && m1_rows >= 0 && m1_rows <= m1_rpb //blocksize constraint left - && m2_cols >= 0 && m2_cols <= m2_cpb ) //blocksize constraint right + && m2_cols >= 0 && m2_cols <= m2_cpb ) //blocksize constraint right { return MMultMethod.ZIPMM; } - + // Step 8: Decide CPMM vs RMM based on io costs //estimate shuffle costs weighted by parallelism //TODO currently we reuse the mr estimates, these need to be fine-tune for our spark operators double rmm_costs = getRMMCostEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m2_rows, m2_cols, m2_rpb, m2_cpb); double cpmm_costs = getCPMMCostEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m2_rows, m2_cols, m2_rpb, m2_cpb); - + //final mmult method decision if ( cpmm_costs < rmm_costs ) return MMultMethod.CPMM;
