[SYSTEMML-2501] Sparse aggregate communication spark cumagg ops

This patch improves GC overhead of Spark cumulative aggregates (forward
cascade) by communicating sparse aggregate blocks in target block sizes.
For example, for 100 distributed sum(cumsum(X)) operations, it reduced
the total runtime from 1,006s to 887s.

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/fee20fb9
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/fee20fb9
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/fee20fb9

Branch: refs/heads/master
Commit: fee20fb9b8c975bd3c3250ae7fe35c4904c0dc09
Parents: 77a7ef1
Author: Matthias Boehm <[email protected]>
Authored: Sat Dec 1 19:22:19 2018 +0100
Committer: Matthias Boehm <[email protected]>
Committed: Sat Dec 1 19:22:19 2018 +0100

----------------------------------------------------------------------
 .../spark/CumulativeAggregateSPInstruction.java         |  4 +++-
 .../instructions/spark/utils/RDDAggregateUtils.java     |  2 +-
 .../apache/sysml/runtime/matrix/data/MatrixBlock.java   | 12 ++++++++----
 3 files changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/fee20fb9/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
index a0dfb85..e8696da 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
@@ -137,7 +137,9 @@ public class CumulativeAggregateSPInstruction extends 
AggregateUnarySPInstructio
                        int rlenBlk = IntUtils.toInt( 
Math.min(rlenOut-(rixOut-1)*_brlen, _brlen));
                        int clenBlk = blkOut.getNumColumns();
                        int posBlk = IntUtils.toInt((ixIn.getRowIndex()-1) % 
_brlen);
-                       MatrixBlock blkOut2 = new MatrixBlock(rlenBlk, clenBlk, 
false);
+                       
+                       //construct sparse output blocks (single row in target 
block size)
+                       MatrixBlock blkOut2 = new MatrixBlock(rlenBlk, clenBlk, 
true);
                        blkOut2.copy(posBlk, posBlk, 0, clenBlk-1, blkOut, 
true);
                        ixOut.setIndexes(rixOut, ixOut.getColumnIndex());
                        

http://git-wip-us.apache.org/repos/asf/systemml/blob/fee20fb9/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index 23b6ad9..0b01099 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -644,7 +644,7 @@ public class RDDAggregateUtils
 
                        // execute merge (never pass by reference)
                        MatrixBlock ret = _deep ? new MatrixBlock(b1) : b1;
-                       ret.merge(b2, false);
+                       ret.merge(b2, false, false, _deep);
                        ret.examSparsity();
                        
                        // sanity check output number of non-zeros

http://git-wip-us.apache.org/repos/asf/systemml/blob/fee20fb9/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index ae5ab84..1ff2f43 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -1623,10 +1623,14 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
         * @param appendOnly ?
         */
        public void merge(MatrixBlock that, boolean appendOnly) {
-               merge(that, appendOnly, false);
+               merge(that, appendOnly, false, true);
        }
        
        public void merge(MatrixBlock that, boolean appendOnly, boolean par) {
+               merge(that, appendOnly, par, true);
+       }
+       
+       public void merge(MatrixBlock that, boolean appendOnly, boolean par, 
boolean deep) {
                //check for empty input source (nothing to merge)
                if( that == null || that.isEmptyBlock(false) )
                        return;
@@ -1647,7 +1651,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                //core matrix block merge (guaranteed non-empty source/target, 
nnz maintenance not required)
                long nnz = nonZeros + that.nonZeros;
                if( sparse )
-                       mergeIntoSparse(that, appendOnly);
+                       mergeIntoSparse(that, appendOnly, deep);
                else if( par )
                        mergeIntoDensePar(that);
                else
@@ -1723,7 +1727,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                }
        }
 
-       private void mergeIntoSparse(MatrixBlock that, boolean appendOnly) {
+       private void mergeIntoSparse(MatrixBlock that, boolean appendOnly, 
boolean deep) {
                SparseBlock a = sparseBlock;
                final boolean COO = (a instanceof SparseBlockCOO);
                final int m = rlen;
@@ -1734,7 +1738,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                                if( b.isEmpty(i) ) continue;
                                if( !COO && a.isEmpty(i) ) {
                                        //copy entire sparse row (no sort 
required)
-                                       a.set(i, b.get(i), true);
+                                       a.set(i, b.get(i), deep);
                                }
                                else {
                                        boolean appended = false;

Reply via email to