[SYSTEMML-2495] Adjust spark cumulative aggregate partitions

This patch improves the robustness of spark cumulative aggregates by
adjusting the number of partitions for intermediates of the forward pass
because this data size can significantly shrink also grow.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/0c4a3611
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/0c4a3611
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/0c4a3611

Branch: refs/heads/master
Commit: 0c4a3611c316cb13c7eaa94facd3446b34c1090e
Parents: 069863f
Author: Matthias Boehm <[email protected]>
Authored: Thu Sep 27 21:17:23 2018 +0200
Committer: Matthias Boehm <[email protected]>
Committed: Thu Sep 27 21:17:23 2018 +0200

----------------------------------------------------------------------
 .../spark/CumulativeAggregateSPInstruction.java       | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/0c4a3611/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
index 8514acc..68cc6db 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
@@ -32,6 +32,7 @@ import org.apache.sysml.runtime.functionobjects.PlusMultiply;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
@@ -59,9 +60,11 @@ public class CumulativeAggregateSPInstruction extends 
AggregateUnarySPInstructio
        public void processInstruction(ExecutionContext ec) {
                SparkExecutionContext sec = (SparkExecutionContext)ec;
                MatrixCharacteristics mc = 
sec.getMatrixCharacteristics(input1.getName());
+               MatrixCharacteristics mcOut = new MatrixCharacteristics(mc);
                long rlen = mc.getRows();
                int brlen = mc.getRowsPerBlock();
                int bclen = mc.getColsPerBlock();
+               mcOut.setRows((long)(Math.ceil((double)rlen/brlen)));
                
                //get input
                JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
@@ -69,12 +72,17 @@ public class CumulativeAggregateSPInstruction extends 
AggregateUnarySPInstructio
                //execute unary aggregate (w/ implicit drop correction)
                AggregateUnaryOperator auop = (AggregateUnaryOperator) _optr;
                JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
-                               in.mapToPair(new RDDCumAggFunction(auop, rlen, 
brlen, bclen));
-               out = RDDAggregateUtils.mergeByKey(out, false);
+                       in.mapToPair(new RDDCumAggFunction(auop, rlen, brlen, 
bclen));
+               //merge partial aggregates, adjusting for correct number of 
partitions
+               //as size can significant shrink (1K) but also grow 
(sparse-dense)
+               int numParts = SparkUtils.getNumPreferredPartitions(mcOut);
+               int minPar = 
(int)Math.min(SparkExecutionContext.getDefaultParallelism(true), 
mcOut.getNumBlocks());
+               out = RDDAggregateUtils.mergeByKey(out, Math.max(numParts, 
minPar), false);
                
                //put output handle in symbol table
-               sec.setRDDHandleForVariable(output.getName(), out);     
+               sec.setRDDHandleForVariable(output.getName(), out);
                sec.addLineageRDD(output.getName(), input1.getName());
+               sec.getMatrixCharacteristics(output.getName()).set(mcOut);
        }
 
        private static class RDDCumAggFunction implements 
PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> 

Reply via email to