Repository: systemml Updated Branches: refs/heads/master 1634239ce -> f86879bd0
[SYSTEMML-1920] Shuffle-free spark binary reblock for aligned blocks This patch makes the existing spark binary reblock instruction more adaptive. If the source and target block sizes are aligned, i.e., output blocks can be constructed in 1:N manner, we now avoid the unnecessary block aggregation which causes a shuffle of the entire matrix. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/119893f1 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/119893f1 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/119893f1 Branch: refs/heads/master Commit: 119893f11c88aaedc0ee6f06f6a2cad72b842cfb Parents: 1634239 Author: Matthias Boehm <mboe...@gmail.com> Authored: Sun Sep 17 15:05:40 2017 -0700 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Sun Sep 17 16:48:53 2017 -0700 ---------------------------------------------------------------------- .../runtime/instructions/spark/ReblockSPInstruction.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/119893f1/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java index f97032b..0839c5b 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java @@ -174,9 +174,14 @@ public class ReblockSPInstruction extends UnarySPInstruction { //BINARY BLOCK <- BINARY BLOCK (different sizes) JavaPairRDD<MatrixIndexes, MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable(input1.getName()); - JavaPairRDD<MatrixIndexes, MatrixBlock> out = - in1.flatMapToPair(new ExtractBlockForBinaryReblock(mc, mcOut)); - out = RDDAggregateUtils.mergeByKey(out, false); + boolean shuffleFreeReblock = mc.dimsKnown() && mcOut.dimsKnown() + && (mc.getRows() < mcOut.getRowsPerBlock() || mc.getRowsPerBlock()%mcOut.getRowsPerBlock() == 0) + && (mc.getCols() < mcOut.getColsPerBlock() || mc.getColsPerBlock()%mcOut.getColsPerBlock() == 0); + + JavaPairRDD<MatrixIndexes, MatrixBlock> out = in1 + .flatMapToPair(new ExtractBlockForBinaryReblock(mc, mcOut)); + if( !shuffleFreeReblock ) + out = RDDAggregateUtils.mergeByKey(out, false); //put output RDD handle into symbol table sec.setRDDHandleForVariable(output.getName(), out);