[SYSTEMML-2495] Adjust spark cumulative aggregate partitions This patch improves the robustness of spark cumulative aggregates by adjusting the number of partitions for intermediates of the forward pass because this data size can significantly shrink also grow.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/0c4a3611 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/0c4a3611 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/0c4a3611 Branch: refs/heads/master Commit: 0c4a3611c316cb13c7eaa94facd3446b34c1090e Parents: 069863f Author: Matthias Boehm <[email protected]> Authored: Thu Sep 27 21:17:23 2018 +0200 Committer: Matthias Boehm <[email protected]> Committed: Thu Sep 27 21:17:23 2018 +0200 ---------------------------------------------------------------------- .../spark/CumulativeAggregateSPInstruction.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/0c4a3611/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java index 8514acc..68cc6db 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java @@ -32,6 +32,7 @@ import org.apache.sysml.runtime.functionobjects.PlusMultiply; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; +import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; @@ -59,9 +60,11 @@ public class CumulativeAggregateSPInstruction extends AggregateUnarySPInstructio public void processInstruction(ExecutionContext ec) { SparkExecutionContext sec = (SparkExecutionContext)ec; MatrixCharacteristics mc = sec.getMatrixCharacteristics(input1.getName()); + MatrixCharacteristics mcOut = new MatrixCharacteristics(mc); long rlen = mc.getRows(); int brlen = mc.getRowsPerBlock(); int bclen = mc.getColsPerBlock(); + mcOut.setRows((long)(Math.ceil((double)rlen/brlen))); //get input JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable( input1.getName() ); @@ -69,12 +72,17 @@ public class CumulativeAggregateSPInstruction extends AggregateUnarySPInstructio //execute unary aggregate (w/ implicit drop correction) AggregateUnaryOperator auop = (AggregateUnaryOperator) _optr; JavaPairRDD<MatrixIndexes,MatrixBlock> out = - in.mapToPair(new RDDCumAggFunction(auop, rlen, brlen, bclen)); - out = RDDAggregateUtils.mergeByKey(out, false); + in.mapToPair(new RDDCumAggFunction(auop, rlen, brlen, bclen)); + //merge partial aggregates, adjusting for correct number of partitions + //as size can significant shrink (1K) but also grow (sparse-dense) + int numParts = SparkUtils.getNumPreferredPartitions(mcOut); + int minPar = (int)Math.min(SparkExecutionContext.getDefaultParallelism(true), mcOut.getNumBlocks()); + out = RDDAggregateUtils.mergeByKey(out, Math.max(numParts, minPar), false); //put output handle in symbol table - sec.setRDDHandleForVariable(output.getName(), out); + sec.setRDDHandleForVariable(output.getName(), out); sec.addLineageRDD(output.getName(), input1.getName()); + sec.getMatrixCharacteristics(output.getName()).set(mcOut); } private static class RDDCumAggFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock>
