[SYSTEMML-2501] Sparse aggregate communication spark cumagg ops This patch improves GC overhead of Spark cumulative aggregates (forward cascade) by communicating sparse aggregate blocks in target block sizes. For example, for 100 distributed sum(cumsum(X)) operations, it reduced the total runtime from 1,006s to 887s.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/fee20fb9 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/fee20fb9 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/fee20fb9 Branch: refs/heads/master Commit: fee20fb9b8c975bd3c3250ae7fe35c4904c0dc09 Parents: 77a7ef1 Author: Matthias Boehm <[email protected]> Authored: Sat Dec 1 19:22:19 2018 +0100 Committer: Matthias Boehm <[email protected]> Committed: Sat Dec 1 19:22:19 2018 +0100 ---------------------------------------------------------------------- .../spark/CumulativeAggregateSPInstruction.java | 4 +++- .../instructions/spark/utils/RDDAggregateUtils.java | 2 +- .../apache/sysml/runtime/matrix/data/MatrixBlock.java | 12 ++++++++---- 3 files changed, 12 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/fee20fb9/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java index a0dfb85..e8696da 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java @@ -137,7 +137,9 @@ public class CumulativeAggregateSPInstruction extends AggregateUnarySPInstructio int rlenBlk = IntUtils.toInt( Math.min(rlenOut-(rixOut-1)*_brlen, _brlen)); int clenBlk = blkOut.getNumColumns(); int posBlk = IntUtils.toInt((ixIn.getRowIndex()-1) % _brlen); - MatrixBlock blkOut2 = new MatrixBlock(rlenBlk, clenBlk, false); + + //construct sparse output blocks (single row in target block size) + MatrixBlock blkOut2 = new MatrixBlock(rlenBlk, clenBlk, true); blkOut2.copy(posBlk, posBlk, 0, clenBlk-1, blkOut, true); ixOut.setIndexes(rixOut, ixOut.getColumnIndex()); http://git-wip-us.apache.org/repos/asf/systemml/blob/fee20fb9/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java index 23b6ad9..0b01099 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java @@ -644,7 +644,7 @@ public class RDDAggregateUtils // execute merge (never pass by reference) MatrixBlock ret = _deep ? new MatrixBlock(b1) : b1; - ret.merge(b2, false); + ret.merge(b2, false, false, _deep); ret.examSparsity(); // sanity check output number of non-zeros http://git-wip-us.apache.org/repos/asf/systemml/blob/fee20fb9/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index ae5ab84..1ff2f43 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -1623,10 +1623,14 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab * @param appendOnly ? */ public void merge(MatrixBlock that, boolean appendOnly) { - merge(that, appendOnly, false); + merge(that, appendOnly, false, true); } public void merge(MatrixBlock that, boolean appendOnly, boolean par) { + merge(that, appendOnly, par, true); + } + + public void merge(MatrixBlock that, boolean appendOnly, boolean par, boolean deep) { //check for empty input source (nothing to merge) if( that == null || that.isEmptyBlock(false) ) return; @@ -1647,7 +1651,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab //core matrix block merge (guaranteed non-empty source/target, nnz maintenance not required) long nnz = nonZeros + that.nonZeros; if( sparse ) - mergeIntoSparse(that, appendOnly); + mergeIntoSparse(that, appendOnly, deep); else if( par ) mergeIntoDensePar(that); else @@ -1723,7 +1727,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab } } - private void mergeIntoSparse(MatrixBlock that, boolean appendOnly) { + private void mergeIntoSparse(MatrixBlock that, boolean appendOnly, boolean deep) { SparseBlock a = sparseBlock; final boolean COO = (a instanceof SparseBlockCOO); final int m = rlen; @@ -1734,7 +1738,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab if( b.isEmpty(i) ) continue; if( !COO && a.isEmpty(i) ) { //copy entire sparse row (no sort required) - a.set(i, b.get(i), true); + a.set(i, b.get(i), deep); } else { boolean appended = false;
