[SYSTEMML-2185] Shuffle tuning spark binary matrix-matrix operations

This patch tunes the shuffle behavior of spark binary matrix-matrix
operations by setting the preferred number of partitions for the join.
This operation has the characteristics that the shuffled intermediate is
larger than the input/output. Hence, we now compute a conservative
preferred number of partitions and explicit enforce the number of
partitions unless the input RDDs carry an existing hash partitioning.

For example, on stratstats 100K x 1K, this patch improved end-to-end
performance from 1100s to 740s.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/9db5c0ad
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/9db5c0ad
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/9db5c0ad

Branch: refs/heads/master
Commit: 9db5c0ad90e216985120630bafca068b7cf279a9
Parents: e1fd343
Author: Matthias Boehm <[email protected]>
Authored: Thu Mar 15 21:10:43 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Thu Mar 15 21:10:43 2018 -0700

----------------------------------------------------------------------
 .../instructions/spark/BinarySPInstruction.java | 26 +++++++++++---------
 1 file changed, 15 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/9db5c0ad/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java
index afcd7af..1560545 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/BinarySPInstruction.java
@@ -36,6 +36,7 @@ import 
org.apache.sysml.runtime.instructions.spark.functions.MatrixScalarUnaryFu
 import 
org.apache.sysml.runtime.instructions.spark.functions.MatrixVectorBinaryOpPartitionFunction;
 import 
org.apache.sysml.runtime.instructions.spark.functions.OuterVectorBinaryOpFunction;
 import 
org.apache.sysml.runtime.instructions.spark.functions.ReplicateVectorFunction;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
@@ -134,14 +135,14 @@ public abstract class BinarySPInstruction extends 
ComputationSPInstruction {
                
                //sanity check dimensions
                checkMatrixMatrixBinaryCharacteristics(sec);
+               updateBinaryOutputMatrixCharacteristics(sec);
                
                // Get input RDDs
-               String rddVar1 = input1.getName();
-               String rddVar2 = input2.getName();
-               JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable( rddVar1 );
-               JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = 
sec.getBinaryBlockRDDHandleForVariable( rddVar2 );
-               MatrixCharacteristics mc1 = sec.getMatrixCharacteristics( 
rddVar1 );
-               MatrixCharacteristics mc2 = sec.getMatrixCharacteristics( 
rddVar2 );
+               JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable(input1.getName());
+               JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = 
sec.getBinaryBlockRDDHandleForVariable(input2.getName());
+               MatrixCharacteristics mc1 = 
sec.getMatrixCharacteristics(input1.getName());
+               MatrixCharacteristics mc2 = 
sec.getMatrixCharacteristics(input2.getName());
+               MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
                
                BinaryOperator bop = (BinaryOperator) _optr;
        
@@ -153,17 +154,20 @@ public abstract class BinarySPInstruction extends 
ComputationSPInstruction {
                        in1 = in1.flatMapToPair(new 
ReplicateVectorFunction(false, numRepLeft ));
                if( numRepRight > 1 )
                        in2 = in2.flatMapToPair(new 
ReplicateVectorFunction(rowvector, numRepRight));
+               int numPrefPart = SparkUtils.isHashPartitioned(in1) ? 
in1.getNumPartitions() :
+                       SparkUtils.isHashPartitioned(in2) ? 
in2.getNumPartitions() :
+                       Math.min(in1.getNumPartitions() + 
in2.getNumPartitions(),
+                               2 * 
SparkUtils.getNumPreferredPartitions(mcOut));
                
                //execute binary operation
                JavaPairRDD<MatrixIndexes,MatrixBlock> out = in1
-                               .join(in2)
-                               .mapValues(new 
MatrixMatrixBinaryOpFunction(bop));
+                       .join(in2, numPrefPart)
+                       .mapValues(new MatrixMatrixBinaryOpFunction(bop));
                
                //set output RDD
-               updateBinaryOutputMatrixCharacteristics(sec);
                sec.setRDDHandleForVariable(output.getName(), out);
-               sec.addLineageRDD(output.getName(), rddVar1);
-               sec.addLineageRDD(output.getName(), rddVar2);
+               sec.addLineageRDD(output.getName(), input1.getName());
+               sec.addLineageRDD(output.getName(), input2.getName());
        }
 
        protected void processMatrixBVectorBinaryInstruction(ExecutionContext 
ec, VectorType vtype) 

Reply via email to