Repository: systemml Updated Branches: refs/heads/master 094f5551d -> 696e79218
[SYSTEMML-2218] Improved spark mapmm (avoid parallelize-repartition) This patch improves the spark mapmm instruction (broadcast-based matrix multiply) by avoiding unnecessary shuffle for repartitioning - which is used to guarantee output partition size - if the input is a parallelized RDD. For this scenario, we now create the parallelized RDD with right number of partitions. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/c16738d2 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/c16738d2 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/c16738d2 Branch: refs/heads/master Commit: c16738d22c0e7c0917e413f3c8cf5db16d76d045 Parents: 094f555 Author: Matthias Boehm <[email protected]> Authored: Fri Mar 30 18:54:47 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Mar 30 18:54:47 2018 -0700 ---------------------------------------------------------------------- .../context/SparkExecutionContext.java | 49 +++++++++++--------- .../instructions/spark/MapmmSPInstruction.java | 10 ++-- 2 files changed, 33 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/c16738d2/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index f898f28..a7211ce 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -291,7 +291,13 @@ public class SparkExecutionContext extends ExecutionContext @SuppressWarnings("unchecked") public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockRDDHandleForVariable( String varname ) { return (JavaPairRDD<MatrixIndexes,MatrixBlock>) - getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo); + getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo, -1); + } + + @SuppressWarnings("unchecked") + public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockRDDHandleForVariable( String varname, int numParts ) { + return (JavaPairRDD<MatrixIndexes,MatrixBlock>) + getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo, numParts); } /** @@ -304,15 +310,19 @@ public class SparkExecutionContext extends ExecutionContext @SuppressWarnings("unchecked") public JavaPairRDD<Long,FrameBlock> getFrameBinaryBlockRDDHandleForVariable( String varname ) { JavaPairRDD<Long,FrameBlock> out = (JavaPairRDD<Long,FrameBlock>) - getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo); + getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo, -1); return out; } public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, InputInfo inputInfo ) { + return getRDDHandleForVariable(varname, inputInfo, -1); + } + + public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, InputInfo inputInfo, int numParts ) { Data dat = getVariable(varname); if( dat instanceof MatrixObject ) { MatrixObject mo = getMatrixObject(varname); - return getRDDHandleForMatrixObject(mo, inputInfo); + return getRDDHandleForMatrixObject(mo, inputInfo, numParts); } else if( dat instanceof FrameObject ) { FrameObject fo = getFrameObject(varname); @@ -323,16 +333,12 @@ public class SparkExecutionContext extends ExecutionContext } } - /** - * This call returns an RDD handle for a given matrix object. This includes - * the creation of RDDs for in-memory or binary-block HDFS data. - * - * @param mo matrix object - * @param inputInfo input info - * @return JavaPairRDD handle for a matrix object - */ - @SuppressWarnings("unchecked") public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, InputInfo inputInfo ) { + return getRDDHandleForMatrixObject(mo, inputInfo, -1); + } + + @SuppressWarnings("unchecked") + public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, InputInfo inputInfo, int numParts ) { //NOTE: MB this logic should be integrated into MatrixObject //However, for now we cannot assume that spark libraries are //always available and hence only store generic references in @@ -366,7 +372,7 @@ public class SparkExecutionContext extends ExecutionContext } else { //default case MatrixBlock mb = mo.acquireRead(); //pin matrix in memory - rdd = toMatrixJavaPairRDD(sc, mb, (int)mo.getNumRowsPerBlock(), (int)mo.getNumColumnsPerBlock()); + rdd = toMatrixJavaPairRDD(sc, mb, (int)mo.getNumRowsPerBlock(), (int)mo.getNumColumnsPerBlock(), numParts); mo.release(); //unpin matrix _parRDDs.registerRDD(rdd.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true); } @@ -657,16 +663,11 @@ public class SparkExecutionContext extends ExecutionContext obj.setRDDHandle( rddhandle ); } - /** - * Utility method for creating an RDD out of an in-memory matrix block. - * - * @param sc java spark context - * @param src matrix block - * @param brlen block row length - * @param bclen block column length - * @return JavaPairRDD handle to matrix block - */ public static JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen) { + return toMatrixJavaPairRDD(sc, src, brlen, bclen, -1); + } + + public static JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen, int numParts) { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; List<Tuple2<MatrixIndexes,MatrixBlock>> list = null; @@ -681,7 +682,9 @@ public class SparkExecutionContext extends ExecutionContext .collect(Collectors.toList()); } - JavaPairRDD<MatrixIndexes,MatrixBlock> result = sc.parallelizePairs(list); + JavaPairRDD<MatrixIndexes,MatrixBlock> result = (numParts > 1) ? + sc.parallelizePairs(list, numParts) : sc.parallelizePairs(list); + if (DMLScript.STATISTICS) { Statistics.accSparkParallelizeTime(System.nanoTime() - t0); Statistics.incSparkParallelizeCount(1); http://git-wip-us.apache.org/repos/asf/systemml/blob/c16738d2/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java index 8f8f576..125f359 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java @@ -97,8 +97,11 @@ public class MapmmSPInstruction extends BinarySPInstruction { MatrixCharacteristics mcRdd = sec.getMatrixCharacteristics(rddVar); MatrixCharacteristics mcBc = sec.getMatrixCharacteristics(bcastVar); - //get input rdd - JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable(rddVar); + //get input rdd with preferred number of partitions to avoid unnecessary repartition + JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable(rddVar, + (requiresFlatMapFunction(type, mcBc) && requiresRepartitioning( + type, mcRdd, mcBc, sec.getSparkContext().defaultParallelism())) ? + getNumRepartitioning(type, mcRdd, mcBc) : -1); //investigate if a repartitioning - including a potential flip of broadcast and rdd //inputs - is required to ensure moderately sized output partitions (2GB limitation) @@ -216,7 +219,8 @@ public class MapmmSPInstruction extends BinarySPInstruction { boolean isLargeOutput = (OptimizerUtils.estimatePartitionedSizeExactSparsity(isLeft?mcBc.getRows():mcRdd.getRows(), isLeft?mcRdd.getCols():mcBc.getCols(), isLeft?mcBc.getRowsPerBlock():mcRdd.getRowsPerBlock(), isLeft?mcRdd.getColsPerBlock():mcBc.getColsPerBlock(), 1.0) / numPartitions) > 1024*1024*1024; - return isOuter && isLargeOutput && mcRdd.dimsKnown() && mcBc.dimsKnown(); + return isOuter && isLargeOutput && mcRdd.dimsKnown() && mcBc.dimsKnown() + && numPartitions < getNumRepartitioning(type, mcRdd, mcBc); } /**
