Repository: systemml
Updated Branches:
  refs/heads/master 094f5551d -> 696e79218


[SYSTEMML-2218] Improved spark mapmm (avoid parallelize-repartition)

This patch improves the spark mapmm instruction (broadcast-based matrix
multiply) by avoiding unnecessary shuffle for repartitioning - which is
used to guarantee output partition size - if the input is a parallelized
RDD. For this scenario, we now create the parallelized RDD with right
number of partitions.
 

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/c16738d2
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/c16738d2
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/c16738d2

Branch: refs/heads/master
Commit: c16738d22c0e7c0917e413f3c8cf5db16d76d045
Parents: 094f555
Author: Matthias Boehm <[email protected]>
Authored: Fri Mar 30 18:54:47 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Fri Mar 30 18:54:47 2018 -0700

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java          | 49 +++++++++++---------
 .../instructions/spark/MapmmSPInstruction.java  | 10 ++--
 2 files changed, 33 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/c16738d2/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index f898f28..a7211ce 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -291,7 +291,13 @@ public class SparkExecutionContext extends ExecutionContext
        @SuppressWarnings("unchecked")
        public JavaPairRDD<MatrixIndexes,MatrixBlock> 
getBinaryBlockRDDHandleForVariable( String varname ) {
                return (JavaPairRDD<MatrixIndexes,MatrixBlock>)
-                       getRDDHandleForVariable( varname, 
InputInfo.BinaryBlockInputInfo);
+                       getRDDHandleForVariable( varname, 
InputInfo.BinaryBlockInputInfo, -1);
+       }
+       
+       @SuppressWarnings("unchecked")
+       public JavaPairRDD<MatrixIndexes,MatrixBlock> 
getBinaryBlockRDDHandleForVariable( String varname, int numParts ) {
+               return (JavaPairRDD<MatrixIndexes,MatrixBlock>)
+                       getRDDHandleForVariable( varname, 
InputInfo.BinaryBlockInputInfo, numParts);
        }
 
        /**
@@ -304,15 +310,19 @@ public class SparkExecutionContext extends 
ExecutionContext
        @SuppressWarnings("unchecked")
        public JavaPairRDD<Long,FrameBlock> 
getFrameBinaryBlockRDDHandleForVariable( String varname ) {
                JavaPairRDD<Long,FrameBlock> out = 
(JavaPairRDD<Long,FrameBlock>)
-                       getRDDHandleForVariable( varname, 
InputInfo.BinaryBlockInputInfo);
+                       getRDDHandleForVariable( varname, 
InputInfo.BinaryBlockInputInfo, -1);
                return out;
        }
 
        public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, 
InputInfo inputInfo ) {
+               return getRDDHandleForVariable(varname, inputInfo, -1);
+       }
+       
+       public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, 
InputInfo inputInfo, int numParts ) {
                Data dat = getVariable(varname);
                if( dat instanceof MatrixObject ) {
                        MatrixObject mo = getMatrixObject(varname);
-                       return getRDDHandleForMatrixObject(mo, inputInfo);
+                       return getRDDHandleForMatrixObject(mo, inputInfo, 
numParts);
                }
                else if( dat instanceof FrameObject ) {
                        FrameObject fo = getFrameObject(varname);
@@ -323,16 +333,12 @@ public class SparkExecutionContext extends 
ExecutionContext
                }
        }
 
-       /**
-        * This call returns an RDD handle for a given matrix object. This 
includes
-        * the creation of RDDs for in-memory or binary-block HDFS data.
-        *
-        * @param mo matrix object
-        * @param inputInfo input info
-        * @return JavaPairRDD handle for a matrix object
-        */
-       @SuppressWarnings("unchecked")
        public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, 
InputInfo inputInfo ) {
+               return getRDDHandleForMatrixObject(mo, inputInfo, -1);
+       }
+       
+       @SuppressWarnings("unchecked")
+       public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, 
InputInfo inputInfo, int numParts ) {
                //NOTE: MB this logic should be integrated into MatrixObject
                //However, for now we cannot assume that spark libraries are
                //always available and hence only store generic references in
@@ -366,7 +372,7 @@ public class SparkExecutionContext extends ExecutionContext
                        }
                        else { //default case
                                MatrixBlock mb = mo.acquireRead(); //pin matrix 
in memory
-                               rdd = toMatrixJavaPairRDD(sc, mb, 
(int)mo.getNumRowsPerBlock(), (int)mo.getNumColumnsPerBlock());
+                               rdd = toMatrixJavaPairRDD(sc, mb, 
(int)mo.getNumRowsPerBlock(), (int)mo.getNumColumnsPerBlock(), numParts);
                                mo.release(); //unpin matrix
                                _parRDDs.registerRDD(rdd.id(), 
OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true);
                        }
@@ -657,16 +663,11 @@ public class SparkExecutionContext extends 
ExecutionContext
                obj.setRDDHandle( rddhandle );
        }
 
-       /**
-        * Utility method for creating an RDD out of an in-memory matrix block.
-        *
-        * @param sc java spark context
-        * @param src matrix block
-        * @param brlen block row length
-        * @param bclen block column length
-        * @return JavaPairRDD handle to matrix block
-        */
        public static JavaPairRDD<MatrixIndexes,MatrixBlock> 
toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen) 
{
+               return toMatrixJavaPairRDD(sc, src, brlen, bclen, -1);
+       }
+       
+       public static JavaPairRDD<MatrixIndexes,MatrixBlock> 
toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen, 
int numParts) {
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                List<Tuple2<MatrixIndexes,MatrixBlock>> list = null;
 
@@ -681,7 +682,9 @@ public class SparkExecutionContext extends ExecutionContext
                                .collect(Collectors.toList());
                }
 
-               JavaPairRDD<MatrixIndexes,MatrixBlock> result = 
sc.parallelizePairs(list);
+               JavaPairRDD<MatrixIndexes,MatrixBlock> result = (numParts > 1) ?
+                       sc.parallelizePairs(list, numParts) : 
sc.parallelizePairs(list);
+               
                if (DMLScript.STATISTICS) {
                        Statistics.accSparkParallelizeTime(System.nanoTime() - 
t0);
                        Statistics.incSparkParallelizeCount(1);

http://git-wip-us.apache.org/repos/asf/systemml/blob/c16738d2/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
index 8f8f576..125f359 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
@@ -97,8 +97,11 @@ public class MapmmSPInstruction extends BinarySPInstruction {
                MatrixCharacteristics mcRdd = 
sec.getMatrixCharacteristics(rddVar);
                MatrixCharacteristics mcBc = 
sec.getMatrixCharacteristics(bcastVar);
                
-               //get input rdd
-               JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable(rddVar);
+               //get input rdd with preferred number of partitions to avoid 
unnecessary repartition
+               JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable(rddVar,
+                       (requiresFlatMapFunction(type, mcBc) && 
requiresRepartitioning(
+                       type, mcRdd, mcBc, 
sec.getSparkContext().defaultParallelism())) ?
+                       getNumRepartitioning(type, mcRdd, mcBc) : -1);
                
                //investigate if a repartitioning - including a potential flip 
of broadcast and rdd 
                //inputs - is required to ensure moderately sized output 
partitions (2GB limitation)
@@ -216,7 +219,8 @@ public class MapmmSPInstruction extends BinarySPInstruction 
{
                boolean isLargeOutput = 
(OptimizerUtils.estimatePartitionedSizeExactSparsity(isLeft?mcBc.getRows():mcRdd.getRows(),
                                isLeft?mcRdd.getCols():mcBc.getCols(), 
isLeft?mcBc.getRowsPerBlock():mcRdd.getRowsPerBlock(),
                                
isLeft?mcRdd.getColsPerBlock():mcBc.getColsPerBlock(), 1.0) / numPartitions) > 
1024*1024*1024; 
-               return isOuter && isLargeOutput && mcRdd.dimsKnown() && 
mcBc.dimsKnown();
+               return isOuter && isLargeOutput && mcRdd.dimsKnown() && 
mcBc.dimsKnown()
+                       && numPartitions < getNumRepartitioning(type, mcRdd, 
mcBc);
        }
 
        /**

Reply via email to