Repository: systemml
Updated Branches:
  refs/heads/master 5f6748fda -> 437e9d661


[SYSTEMML-2177] Performance spark cpmm (set join parallelism)

This patch makes a minor performance improvement to spark cpmm
operations by computing and setting the preferred degree of parallelism
according to data and cluster characteristics. Since cpmm anyway changes
keys before the join, the changed number of partitions does not affect a
potentially exiting partitioner.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/437e9d66
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/437e9d66
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/437e9d66

Branch: refs/heads/master
Commit: 437e9d6618b197f07a70b35df32dae5b6bb301a4
Parents: 5f6748f
Author: Matthias Boehm <[email protected]>
Authored: Thu Mar 8 18:25:43 2018 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Thu Mar 8 18:25:43 2018 -0800

----------------------------------------------------------------------
 .../instructions/spark/CpmmSPInstruction.java   | 33 +++++++++++++++++---
 1 file changed, 29 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/437e9d66/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
index c51a6ee..adac2fc 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
@@ -34,6 +34,8 @@ import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import 
org.apache.sysml.runtime.instructions.spark.functions.FilterNonEmptyBlocksFunction;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
@@ -83,20 +85,27 @@ public class CpmmSPInstruction extends BinarySPInstruction {
                SparkExecutionContext sec = (SparkExecutionContext)ec;
                
                //get rdd inputs
-               JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
-               JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = 
sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
+               JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable(input1.getName());
+               JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = 
sec.getBinaryBlockRDDHandleForVariable(input2.getName());
+               MatrixCharacteristics mc1 = 
sec.getMatrixCharacteristics(input1.getName());
+               MatrixCharacteristics mc2 = 
sec.getMatrixCharacteristics(input2.getName());
+               
                if( _aggtype == SparkAggType.SINGLE_BLOCK ) {
                        //prune empty blocks of ultra-sparse matrices
                        in1 = in1.filter(new FilterNonEmptyBlocksFunction());
                        in2 = in2.filter(new FilterNonEmptyBlocksFunction());
                }
                
+               //compute preferred join degree of parallelism
+               int numPreferred = getPreferredParJoin(mc1, mc2, 
in1.getNumPartitions(), in2.getNumPartitions());
+               int numPartJoin = Math.min(getMaxParJoin(mc1, mc2), 
numPreferred);
+               
                //process core cpmm matrix multiply 
                JavaPairRDD<Long, IndexedMatrixValue> tmp1 = in1.mapToPair(new 
CpmmIndexFunction(true));
                JavaPairRDD<Long, IndexedMatrixValue> tmp2 = in2.mapToPair(new 
CpmmIndexFunction(false));
                JavaPairRDD<MatrixIndexes,MatrixBlock> out = tmp1
-                       .join(tmp2)                              // join over 
common dimension
-                       .mapToPair(new CpmmMultiplyFunction());  // compute 
block multiplications
+                       .join(tmp2, numPartJoin)                // join over 
common dimension
+                       .mapToPair(new CpmmMultiplyFunction()); // compute 
block multiplications
                
                //process cpmm aggregation and handle outputs
                if( _aggtype == SparkAggType.SINGLE_BLOCK ) {
@@ -120,6 +129,22 @@ public class CpmmSPInstruction extends BinarySPInstruction 
{
                        updateBinaryMMOutputMatrixCharacteristics(sec, true);
                }
        }
+       
+       private static int getPreferredParJoin(MatrixCharacteristics mc1, 
MatrixCharacteristics mc2, int numPar1, int numPar2) {
+               int defPar = SparkExecutionContext.getDefaultParallelism(true);
+               int maxParIn = Math.max(numPar1, numPar2);
+               int maxSizeIn = SparkUtils.getNumPreferredPartitions(mc1) +
+                       SparkUtils.getNumPreferredPartitions(mc2);
+               int tmp = (mc1.dimsKnown(true) && mc2.dimsKnown(true)) ? 
+                       Math.max(maxSizeIn, maxParIn) : maxParIn;
+               return (tmp > defPar/2) ? Math.max(tmp, defPar) : tmp;
+       }
+       
+       private static int getMaxParJoin(MatrixCharacteristics mc1, 
MatrixCharacteristics mc2) {
+               return mc1.colsKnown() ? (int)mc1.getNumColBlocks() :
+                       mc2.rowsKnown() ? (int)mc2.getNumRowBlocks() :
+                       Integer.MAX_VALUE;
+       }
 
        private static class CpmmIndexFunction implements 
PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, Long, IndexedMatrixValue>
        {

Reply via email to