Repository: systemml Updated Branches: refs/heads/master 5f6748fda -> 437e9d661
[SYSTEMML-2177] Performance spark cpmm (set join parallelism) This patch makes a minor performance improvement to spark cpmm operations by computing and setting the preferred degree of parallelism according to data and cluster characteristics. Since cpmm anyway changes keys before the join, the changed number of partitions does not affect a potentially exiting partitioner. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/437e9d66 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/437e9d66 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/437e9d66 Branch: refs/heads/master Commit: 437e9d6618b197f07a70b35df32dae5b6bb301a4 Parents: 5f6748f Author: Matthias Boehm <[email protected]> Authored: Thu Mar 8 18:25:43 2018 -0800 Committer: Matthias Boehm <[email protected]> Committed: Thu Mar 8 18:25:43 2018 -0800 ---------------------------------------------------------------------- .../instructions/spark/CpmmSPInstruction.java | 33 +++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/437e9d66/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java index c51a6ee..adac2fc 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java @@ -34,6 +34,8 @@ import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.functions.FilterNonEmptyBlocksFunction; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; +import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; @@ -83,20 +85,27 @@ public class CpmmSPInstruction extends BinarySPInstruction { SparkExecutionContext sec = (SparkExecutionContext)ec; //get rdd inputs - JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() ); - JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = sec.getBinaryBlockRDDHandleForVariable( input2.getName() ); + JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable(input1.getName()); + JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = sec.getBinaryBlockRDDHandleForVariable(input2.getName()); + MatrixCharacteristics mc1 = sec.getMatrixCharacteristics(input1.getName()); + MatrixCharacteristics mc2 = sec.getMatrixCharacteristics(input2.getName()); + if( _aggtype == SparkAggType.SINGLE_BLOCK ) { //prune empty blocks of ultra-sparse matrices in1 = in1.filter(new FilterNonEmptyBlocksFunction()); in2 = in2.filter(new FilterNonEmptyBlocksFunction()); } + //compute preferred join degree of parallelism + int numPreferred = getPreferredParJoin(mc1, mc2, in1.getNumPartitions(), in2.getNumPartitions()); + int numPartJoin = Math.min(getMaxParJoin(mc1, mc2), numPreferred); + //process core cpmm matrix multiply JavaPairRDD<Long, IndexedMatrixValue> tmp1 = in1.mapToPair(new CpmmIndexFunction(true)); JavaPairRDD<Long, IndexedMatrixValue> tmp2 = in2.mapToPair(new CpmmIndexFunction(false)); JavaPairRDD<MatrixIndexes,MatrixBlock> out = tmp1 - .join(tmp2) // join over common dimension - .mapToPair(new CpmmMultiplyFunction()); // compute block multiplications + .join(tmp2, numPartJoin) // join over common dimension + .mapToPair(new CpmmMultiplyFunction()); // compute block multiplications //process cpmm aggregation and handle outputs if( _aggtype == SparkAggType.SINGLE_BLOCK ) { @@ -120,6 +129,22 @@ public class CpmmSPInstruction extends BinarySPInstruction { updateBinaryMMOutputMatrixCharacteristics(sec, true); } } + + private static int getPreferredParJoin(MatrixCharacteristics mc1, MatrixCharacteristics mc2, int numPar1, int numPar2) { + int defPar = SparkExecutionContext.getDefaultParallelism(true); + int maxParIn = Math.max(numPar1, numPar2); + int maxSizeIn = SparkUtils.getNumPreferredPartitions(mc1) + + SparkUtils.getNumPreferredPartitions(mc2); + int tmp = (mc1.dimsKnown(true) && mc2.dimsKnown(true)) ? + Math.max(maxSizeIn, maxParIn) : maxParIn; + return (tmp > defPar/2) ? Math.max(tmp, defPar) : tmp; + } + + private static int getMaxParJoin(MatrixCharacteristics mc1, MatrixCharacteristics mc2) { + return mc1.colsKnown() ? (int)mc1.getNumColBlocks() : + mc2.rowsKnown() ? (int)mc2.getNumRowBlocks() : + Integer.MAX_VALUE; + } private static class CpmmIndexFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, Long, IndexedMatrixValue> {
