Repository: incubator-systemml
Updated Branches:
  refs/heads/master ccdb61fd6 -> b8de68b74


[SYSTEMML-1593] Performance spark rexpand op (load balance, no shuffle)

This patch fixes performance issues of spark rexpand operations, which
showed poor performance for ultra-sparse outputs. The reasons where (1)
load imbalance for a small input vector that creates a huge ultra-sparse
matrix, and (2) unnecessary shuffle of the large output matrix. Consider
the following scenario, where v is a 20M x 1 vector:

FK = table(seq(1,nrow(v)), v, nrow(v), 1e6)
print("Sum of FK = " + sum(FK))

The input is just 160MB and hence read as two partitions from HDFS.
However, the output is large due to its blocked representation, leading
to load imbalance. This patch improved performance of the above scenario
on a 1+6 node cluster from 794s to 49s (incl. 30s for spark context
creation).


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b8de68b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b8de68b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b8de68b7

Branch: refs/heads/master
Commit: b8de68b74d2002fe4a232a7e77a28bfd121ca4eb
Parents: ccdb61f
Author: Matthias Boehm <[email protected]>
Authored: Mon May 8 17:01:24 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Mon May 8 17:03:50 2017 -0700

----------------------------------------------------------------------
 .../spark/ParameterizedBuiltinSPInstruction.java        | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b8de68b7/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index 5c27f60..2a80cfc 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -411,10 +411,18 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                        long brlen = mcIn.getRowsPerBlock();
                        long bclen = mcIn.getColsPerBlock();
                        
-                       //execute remove empty rows/cols operation
+                       //repartition input vector for higher degree of 
parallelism 
+                       //(avoid scenarios where few input partitions create 
huge outputs)
+                       MatrixCharacteristics mcTmp = new 
MatrixCharacteristics(dirRows?lmaxVal:mcIn.getRows(), 
+                                       dirRows?mcIn.getRows():lmaxVal, 
(int)brlen, (int)bclen, mcIn.getRows());
+                       int numParts = 
(int)Math.min(SparkUtils.getNumPreferredPartitions(mcTmp, in), 
mcIn.getNumBlocks());
+                       if( numParts > in.getNumPartitions()*2 )
+                               in = in.repartition(numParts);
+                       
+                       //execute rexpand rows/cols operation (no shuffle 
required because outputs are
+                       //block-aligned with the input, i.e., one input block 
generates n output blocks)
                        JavaPairRDD<MatrixIndexes,MatrixBlock> out = in
                                        .flatMapToPair(new 
RDDRExpandFunction(maxVal, dirRows, cast, ignore, brlen, bclen));            
-                       out = RDDAggregateUtils.mergeByKey(out, false);
                        
                        //store output rdd handle
                        sec.setRDDHandleForVariable(output.getName(), out);

Reply via email to