Repository: incubator-systemml
Updated Branches:
  refs/heads/master 9c4228215 -> 2d32f6d58


[SYSTEMML-382] Performance sparse unary aggregates (contiguous CSR/COO)

This patch extends the sparse block abstraction to expose the meta data
of contiguous underlying data structures (e.g., CSR, COO). In a first
step, we exploit this for more efficient unary aggregates over sparse
data. In detail, it affects the following opcodes: uakp, uackp, uasqkp,
uacsqkp, uamin, uacmin, uamax, uacmax, uamean, uacmean, uavar, and
uacvar. Furthermore, we now also use a more efficient method of
computing correction counts for uamean/uacmean and uavar/uacvar. 

In an example of 1000 min(X) computations over a 100M x 10k matrix with
sparsity 0.01 (~120GB RDD storage) on 6 worker nodes, this led to an
end-to-end runtime improvement from 1,197s to 821s.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/1b292834
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/1b292834
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/1b292834

Branch: refs/heads/master
Commit: 1b292834f8ee09a8e059a9aec05e66af55f83d48
Parents: 9c42282
Author: Matthias Boehm <[email protected]>
Authored: Sat Jan 23 19:37:18 2016 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Sat Jan 23 19:37:18 2016 -0800

----------------------------------------------------------------------
 .../sysml/runtime/matrix/data/LibMatrixAgg.java | 179 ++++++++++++-------
 .../sysml/runtime/matrix/data/SparseBlock.java  |   9 +
 .../runtime/matrix/data/SparseBlockCOO.java     |   5 +
 .../runtime/matrix/data/SparseBlockCSR.java     |   5 +
 .../runtime/matrix/data/SparseBlockMCSR.java    |   5 +
 5 files changed, 137 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b292834/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
index b52aaaf..2b90ca8 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
@@ -2118,9 +2118,14 @@ public class LibMatrixAgg
         */
        private static void s_uakp( SparseBlock a, double[] c, int m, int n, 
KahanObject kbuff, KahanPlus kplus, int rl, int ru )
        {
-               for( int i=rl; i<ru; i++ ) {
-                       if( !a.isEmpty(i) )
-                               sum(a.values(i), a.pos(i), a.size(i), kbuff, 
kplus);
+               if( a.isContiguous() ) {
+                       sum(a.values(rl), a.pos(rl), (int)a.size(rl, ru), 
kbuff, kplus);
+               }
+               else {
+                       for( int i=rl; i<ru; i++ ) {
+                               if( !a.isEmpty(i) )
+                                       sum(a.values(i), a.pos(i), a.size(i), 
kbuff, kplus);
+                       }
                }
                c[0] = kbuff._sum;
                c[1] = kbuff._correction;       
@@ -2161,10 +2166,15 @@ public class LibMatrixAgg
        private static void s_uackp( SparseBlock a, double[] c, int m, int n, 
KahanObject kbuff, KahanPlus kplus, int rl, int ru ) 
        {
                //compute column aggregates
-               for( int i=rl; i<ru; i++ )
-                       if( !a.isEmpty(i) ) {
-                               sumAgg( a.values(i), c, a.indexes(i), a.pos(i), 
a.size(i), n, kbuff, kplus );
+               if( a.isContiguous() ) {
+                       sumAgg( a.values(rl), c, a.indexes(rl), a.pos(rl), 
(int)a.size(rl, ru), n, kbuff, kplus );
+               }
+               else {
+                       for( int i=rl; i<ru; i++ ) {
+                               if( !a.isEmpty(i) )
+                                       sumAgg( a.values(i), c, a.indexes(i), 
a.pos(i), a.size(i), n, kbuff, kplus );
                        }
+               }
        }
 
        /**
@@ -2185,9 +2195,13 @@ public class LibMatrixAgg
        private static void s_uasqkp(SparseBlock a, double[] c, int m, int n, 
KahanObject kbuff,
                                     KahanPlusSq kplusSq, int rl, int ru )
        {
-               for (int i=rl; i<ru; i++) {
-                       if (!a.isEmpty(i)) {
-                               sumSq(a.values(i), a.pos(i), a.size(i), kbuff, 
kplusSq);
+               if( a.isContiguous() ) {
+                       sumSq(a.values(rl), a.pos(rl), (int)a.size(rl, ru), 
kbuff, kplusSq);    
+               }
+               else {
+                       for (int i=rl; i<ru; i++) {
+                               if (!a.isEmpty(i))
+                                       sumSq(a.values(i), a.pos(i), a.size(i), 
kbuff, kplusSq);
                        }
                }
                c[0] = kbuff._sum;
@@ -2244,9 +2258,13 @@ public class LibMatrixAgg
                                      KahanPlusSq kplusSq, int rl, int ru )
        {
                //compute column aggregates
-               for (int i=rl; i<ru; i++) {
-                       if (!a.isEmpty(i)) {
-                               sumSqAgg(a.values(i), c, a.indexes(i), 
a.pos(i), a.size(i), n, kbuff, kplusSq);
+               if( a.isContiguous() ) {
+                       sumSqAgg(a.values(rl), c, a.indexes(rl), a.pos(rl), 
(int)a.size(rl, ru), n, kbuff, kplusSq);
+               }
+               else {
+                       for (int i=rl; i<ru; i++) {
+                               if (!a.isEmpty(i))
+                                       sumSqAgg(a.values(i), c, a.indexes(i), 
a.pos(i), a.size(i), n, kbuff, kplusSq);
                        }
                }
        }
@@ -2392,17 +2410,26 @@ public class LibMatrixAgg
        private static void s_uamxx( SparseBlock a, double[] c, int m, int n, 
double init, Builtin builtin, int rl, int ru )
        {
                double ret = init; //keep init val
-               for( int i=rl; i<ru; i++ )
-               {
-                       if( !a.isEmpty(i) ) {
-                               double lval = builtin(a.values(i), a.pos(i), 
init, a.size(i), builtin);
-                               ret = builtin.execute2(ret, lval);
-                       }
                
+               if( a.isContiguous() ) {
+                       int alen = (int) a.size(rl, ru);
+                       double val = builtin(a.values(rl), a.pos(rl), init, 
alen, builtin);
+                       ret = builtin.execute2(ret, val);
                        //correction (not sparse-safe)
-                       if( a.size(i) < n )
-                               ret = builtin.execute2(ret, 0); 
+                       ret = (alen<(ru-rl)*n) ? builtin.execute2(ret, 0) : 
ret;                                
+               }
+               else {
+                       for( int i=rl; i<ru; i++ ) {
+                               if( !a.isEmpty(i) ) {
+                                       double lval = builtin(a.values(i), 
a.pos(i), init, a.size(i), builtin);
+                                       ret = builtin.execute2(ret, lval);
+                               }               
+                               //correction (not sparse-safe)
+                               if( a.size(i) < n )
+                                       ret = builtin.execute2(ret, 0); 
+                       }       
                }
+       
                c[0] = ret; 
        }
        
@@ -2451,15 +2478,21 @@ public class LibMatrixAgg
                int[] cnt = new int[ n ]; 
 
                //compute column aggregates min/max
-               for( int i=rl; i<ru; i++ )
-               {
-                       if( !a.isEmpty(i) ) {
-                               int apos = a.pos(i);
-                               int alen = a.size(i);
-                               double[] avals = a.values(i);
-                               int[] aix = a.indexes(i);
-                               builtinAgg( avals, c, aix, apos, alen, builtin 
);
-                               countAgg( avals, cnt, aix, apos, alen );
+               if( a.isContiguous() ) {
+                       int alen = (int) a.size(rl, ru);
+                       builtinAgg( a.values(rl), c, a.indexes(rl), a.pos(rl), 
alen, builtin );
+                       countAgg( a.values(rl), cnt, a.indexes(rl), a.pos(rl), 
alen );
+               }
+               else {
+                       for( int i=rl; i<ru; i++ ) {
+                               if( !a.isEmpty(i) ) {
+                                       int apos = a.pos(i);
+                                       int alen = a.size(i);
+                                       double[] avals = a.values(i);
+                                       int[] aix = a.indexes(i);
+                                       builtinAgg( avals, c, aix, apos, alen, 
builtin );
+                                       countAgg( avals, cnt, aix, apos, alen );
+                               }
                        }
                }
                
@@ -2577,22 +2610,24 @@ public class LibMatrixAgg
                //correction remaining tuples (not sparse-safe)
                //note: before aggregate computation in order to
                //exploit 0 sum (noop) and better numerical stability
-               for( int i=rl; i<ru; i++ )
-                       count += (a.isEmpty(i)) ? n : n-a.size(i);
+               count += (ru-rl)*n - a.size(rl, ru);
                
                //compute aggregate mean
-               for( int i=rl; i<ru; i++ )
-               {
-                       if( !a.isEmpty(i) ) {
-                               int alen = a.size(i);
-                               mean(a.values(i), a.pos(i), alen, count, kbuff, 
kmean);
-                               count += alen;
+               if( a.isContiguous() ) {
+                       int alen = (int) a.size(rl, ru);
+                       mean(a.values(rl), a.pos(rl), alen, count, kbuff, 
kmean);
+                       count += alen;
+               }
+               else {
+                       for( int i=rl; i<ru; i++ ) {
+                               if( !a.isEmpty(i) ) {
+                                       int alen = a.size(i);
+                                       mean(a.values(i), a.pos(i), alen, 
count, kbuff, kmean);
+                                       count += alen;
+                               }
                        }
                }
 
-               //OLD VERSION: correction remaining tuples (not sparse-safe)
-               //mean(0, len-count, count, kbuff, kplus);
-               
                c[0] = kbuff._sum;
                c[1] = len;
                c[2] = kbuff._correction;
@@ -2622,10 +2657,6 @@ public class LibMatrixAgg
                                mean(a.values(i), a.pos(i), a.size(i), count, 
kbuff, kmean);
                        }
                        
-                       //OLD VERSION: correction remaining tuples (not 
sparse-safe)
-                       //int count = ((arow==null) ? 0 : arow.size());
-                       //mean(0, n-count, count, kbuff, kplus);
-                       
                        c[cix+0] = kbuff._sum;
                        c[cix+1] = n;
                        c[cix+2] = kbuff._correction;
@@ -2649,18 +2680,24 @@ public class LibMatrixAgg
                //note: before aggregate computation in order to
                //exploit 0 sum (noop) and better numerical stability
                Arrays.fill(c, n, n*2, ru-rl);
-               for( int i=rl; i<ru; i++ ) 
-               {
-                       if( !a.isEmpty(i) ) {
-                               countDisAgg( a.values(i), c, a.indexes(i), 
a.pos(i), n, a.size(i) );
+               if( a.isContiguous() ) {
+                       countDisAgg( a.values(rl), c, a.indexes(rl), a.pos(rl), 
n, (int)a.size(rl, ru) );
+               }
+               else {
+                       for( int i=rl; i<ru; i++ ) {
+                               if( !a.isEmpty(i) )
+                                       countDisAgg( a.values(i), c, 
a.indexes(i), a.pos(i), n, a.size(i) );
                        }
-               } 
+               }
                
                //compute column aggregate means
-               for( int i=rl; i<ru; i++ )
-               {
-                       if( !a.isEmpty(i) ) {
-                               meanAgg( a.values(i), c, a.indexes(i), 
a.pos(i), a.size(i), n, kbuff, kmean );
+               if( a.isContiguous() ) {
+                       meanAgg( a.values(rl), c, a.indexes(rl), a.pos(rl), 
(int)a.size(rl, ru), n, kbuff, kmean );
+               }
+               else {
+                       for( int i=rl; i<ru; i++ ) {
+                               if( !a.isEmpty(i) )
+                                       meanAgg( a.values(i), c, a.indexes(i), 
a.pos(i), a.size(i), n, kbuff, kmean );
                        }
                }
        }
@@ -2684,15 +2721,18 @@ public class LibMatrixAgg
                                    int rl, int ru) throws DMLRuntimeException
        {
                // compute and store count of empty cells before aggregation
-               int count = 0;
-               for (int i=rl; i<ru; i++)
-                       count += (a.isEmpty(i)) ? n : n-a.size(i);
+               int count = (ru-rl)*n - (int)a.size(rl, ru);
                cbuff.w = count;
 
                // calculate aggregated variance (only using non-empty cells)
-               for (int i=rl; i<ru; i++) {
-                       if (!a.isEmpty(i))
-                               var(a.values(i), a.pos(i), a.size(i), cbuff, 
cm);
+               if( a.isContiguous() ) {
+                       var(a.values(rl), a.pos(rl), (int)a.size(rl, ru), 
cbuff, cm);
+               }
+               else {
+                       for (int i=rl; i<ru; i++) {
+                               if (!a.isEmpty(i))
+                                       var(a.values(i), a.pos(i), a.size(i), 
cbuff, cm);
+                       }
                }
 
                // store results: { var | mean, count, m2 correction, mean 
correction }
@@ -2766,17 +2806,24 @@ public class LibMatrixAgg
                // - first, store total possible column counts in 3rd row of 
output
                Arrays.fill(c, n*2, n*3, ru-rl); // counts stored in 3rd row
                // - then subtract one from the column count for each dense 
value in the column
-               for (int i=rl; i<ru; i++) {
-                       if (!a.isEmpty(i)) {
-                               // counts stored in 3rd row
-                               countDisAgg(a.values(i), c, a.indexes(i), 
a.pos(i), n*2, a.size(i)); 
+               if( a.isContiguous() ) {
+                       countDisAgg(a.values(rl), c, a.indexes(rl), a.pos(rl), 
n*2, (int)a.size(rl, ru)); 
+               }
+               else {
+                       for (int i=rl; i<ru; i++) {
+                               if (!a.isEmpty(i)) // counts stored in 3rd row
+                                       countDisAgg(a.values(i), c, 
a.indexes(i), a.pos(i), n*2, a.size(i)); 
                        }
                }
 
                // calculate aggregated variance for each column
-               for (int i=rl; i<ru; i++) {
-                       if (!a.isEmpty(i)) {
-                               varAgg(a.values(i), c, a.indexes(i), a.pos(i), 
a.size(i), n, cbuff, cm);
+               if( a.isContiguous() ) {
+                       varAgg(a.values(rl), c, a.indexes(rl), a.pos(rl), 
(int)a.size(rl, ru), n, cbuff, cm);
+               }
+               else {
+                       for (int i=rl; i<ru; i++) {
+                               if (!a.isEmpty(i))
+                                       varAgg(a.values(i), c, a.indexes(i), 
a.pos(i), a.size(i), n, cbuff, cm);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b292834/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java
index e340f5d..0786b87 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java
@@ -99,6 +99,15 @@ public abstract class SparseBlock implements Serializable
        public abstract boolean isThreadSafe();
 
        /**
+        * Indicates if the underlying data structures returned by values 
+        * and indexes are contiguous arrays, which can be exploited for 
+        * more efficient operations.
+        * 
+        * @return
+        */
+       public abstract boolean isContiguous();
+       
+       /**
         * Clears the sparse block by deleting non-zero values. After this call
         * all size() calls are guaranteed to return 0.
         */

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b292834/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java
index 5643850..7173946 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java
@@ -176,6 +176,11 @@ public class SparseBlockCOO extends SparseBlock
                return false;
        }
        
+       @Override
+       public boolean isContiguous() {
+               return true;
+       }
+       
        @Override 
        public void reset() {
                _size = 0;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b292834/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
index c599753..7a447bd 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
@@ -178,6 +178,11 @@ public class SparseBlockCSR extends SparseBlock
                return false;
        }
        
+       @Override
+       public boolean isContiguous() {
+               return true;
+       }
+       
        @Override 
        public void reset() {
                _size = 0;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b292834/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java
index 6e1cded..dfddf47 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java
@@ -137,6 +137,11 @@ public class SparseBlockMCSR extends SparseBlock
        public boolean isThreadSafe() {
                return true;
        }
+       
+       @Override
+       public boolean isContiguous() {
+               return false;
+       }
 
        @Override 
        public void reset() {

Reply via email to