[SYSTEMML-2185] Shuffle tuning spark binary matrix-matrix operations This patch tunes the shuffle behavior of spark binary matrix-matrix operations by setting the preferred number of partitions for the join. This operation has the characteristics that the shuffled intermediate is larger than the input/output. Hence, we now compute a conservative preferred number of partitions and explicit enforce the number of partitions unless the input RDDs carry an existing hash partitioning.
For example, on stratstats 100K x 1K, this patch improved end-to-end performance from 1100s to 740s. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/9db5c0ad Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/9db5c0ad Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/9db5c0ad Branch: refs/heads/master Commit: 9db5c0ad90e216985120630bafca068b7cf279a9 Parents: e1fd343 Author: Matthias Boehm <[email protected]> Authored: Thu Mar 15 21:10:43 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Thu Mar 15 21:10:43 2018 -0700 ---------------------------------------------------------------------- .../instructions/spark/BinarySPInstruction.java | 26 +++++++++++--------- 1 file changed, 15 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/9db5c0ad/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java index afcd7af..1560545 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java @@ -36,6 +36,7 @@ import org.apache.sysml.runtime.instructions.spark.functions.MatrixScalarUnaryFu import org.apache.sysml.runtime.instructions.spark.functions.MatrixVectorBinaryOpPartitionFunction; import org.apache.sysml.runtime.instructions.spark.functions.OuterVectorBinaryOpFunction; import org.apache.sysml.runtime.instructions.spark.functions.ReplicateVectorFunction; +import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; @@ -134,14 +135,14 @@ public abstract class BinarySPInstruction extends ComputationSPInstruction { //sanity check dimensions checkMatrixMatrixBinaryCharacteristics(sec); + updateBinaryOutputMatrixCharacteristics(sec); // Get input RDDs - String rddVar1 = input1.getName(); - String rddVar2 = input2.getName(); - JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( rddVar1 ); - JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = sec.getBinaryBlockRDDHandleForVariable( rddVar2 ); - MatrixCharacteristics mc1 = sec.getMatrixCharacteristics( rddVar1 ); - MatrixCharacteristics mc2 = sec.getMatrixCharacteristics( rddVar2 ); + JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable(input1.getName()); + JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = sec.getBinaryBlockRDDHandleForVariable(input2.getName()); + MatrixCharacteristics mc1 = sec.getMatrixCharacteristics(input1.getName()); + MatrixCharacteristics mc2 = sec.getMatrixCharacteristics(input2.getName()); + MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); BinaryOperator bop = (BinaryOperator) _optr; @@ -153,17 +154,20 @@ public abstract class BinarySPInstruction extends ComputationSPInstruction { in1 = in1.flatMapToPair(new ReplicateVectorFunction(false, numRepLeft )); if( numRepRight > 1 ) in2 = in2.flatMapToPair(new ReplicateVectorFunction(rowvector, numRepRight)); + int numPrefPart = SparkUtils.isHashPartitioned(in1) ? in1.getNumPartitions() : + SparkUtils.isHashPartitioned(in2) ? in2.getNumPartitions() : + Math.min(in1.getNumPartitions() + in2.getNumPartitions(), + 2 * SparkUtils.getNumPreferredPartitions(mcOut)); //execute binary operation JavaPairRDD<MatrixIndexes,MatrixBlock> out = in1 - .join(in2) - .mapValues(new MatrixMatrixBinaryOpFunction(bop)); + .join(in2, numPrefPart) + .mapValues(new MatrixMatrixBinaryOpFunction(bop)); //set output RDD - updateBinaryOutputMatrixCharacteristics(sec); sec.setRDDHandleForVariable(output.getName(), out); - sec.addLineageRDD(output.getName(), rddVar1); - sec.addLineageRDD(output.getName(), rddVar2); + sec.addLineageRDD(output.getName(), input1.getName()); + sec.addLineageRDD(output.getName(), input2.getName()); } protected void processMatrixBVectorBinaryInstruction(ExecutionContext ec, VectorType vtype)
