Repository: incubator-systemml Updated Branches: refs/heads/master 9c4228215 -> 2d32f6d58
[SYSTEMML-382] Performance sparse unary aggregates (contiguous CSR/COO) This patch extends the sparse block abstraction to expose the meta data of contiguous underlying data structures (e.g., CSR, COO). In a first step, we exploit this for more efficient unary aggregates over sparse data. In detail, it affects the following opcodes: uakp, uackp, uasqkp, uacsqkp, uamin, uacmin, uamax, uacmax, uamean, uacmean, uavar, and uacvar. Furthermore, we now also use a more efficient method of computing correction counts for uamean/uacmean and uavar/uacvar. In an example of 1000 min(X) computations over a 100M x 10k matrix with sparsity 0.01 (~120GB RDD storage) on 6 worker nodes, this led to an end-to-end runtime improvement from 1,197s to 821s. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/1b292834 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/1b292834 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/1b292834 Branch: refs/heads/master Commit: 1b292834f8ee09a8e059a9aec05e66af55f83d48 Parents: 9c42282 Author: Matthias Boehm <[email protected]> Authored: Sat Jan 23 19:37:18 2016 -0800 Committer: Matthias Boehm <[email protected]> Committed: Sat Jan 23 19:37:18 2016 -0800 ---------------------------------------------------------------------- .../sysml/runtime/matrix/data/LibMatrixAgg.java | 179 ++++++++++++------- .../sysml/runtime/matrix/data/SparseBlock.java | 9 + .../runtime/matrix/data/SparseBlockCOO.java | 5 + .../runtime/matrix/data/SparseBlockCSR.java | 5 + .../runtime/matrix/data/SparseBlockMCSR.java | 5 + 5 files changed, 137 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b292834/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java index b52aaaf..2b90ca8 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java @@ -2118,9 +2118,14 @@ public class LibMatrixAgg */ private static void s_uakp( SparseBlock a, double[] c, int m, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru ) { - for( int i=rl; i<ru; i++ ) { - if( !a.isEmpty(i) ) - sum(a.values(i), a.pos(i), a.size(i), kbuff, kplus); + if( a.isContiguous() ) { + sum(a.values(rl), a.pos(rl), (int)a.size(rl, ru), kbuff, kplus); + } + else { + for( int i=rl; i<ru; i++ ) { + if( !a.isEmpty(i) ) + sum(a.values(i), a.pos(i), a.size(i), kbuff, kplus); + } } c[0] = kbuff._sum; c[1] = kbuff._correction; @@ -2161,10 +2166,15 @@ public class LibMatrixAgg private static void s_uackp( SparseBlock a, double[] c, int m, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru ) { //compute column aggregates - for( int i=rl; i<ru; i++ ) - if( !a.isEmpty(i) ) { - sumAgg( a.values(i), c, a.indexes(i), a.pos(i), a.size(i), n, kbuff, kplus ); + if( a.isContiguous() ) { + sumAgg( a.values(rl), c, a.indexes(rl), a.pos(rl), (int)a.size(rl, ru), n, kbuff, kplus ); + } + else { + for( int i=rl; i<ru; i++ ) { + if( !a.isEmpty(i) ) + sumAgg( a.values(i), c, a.indexes(i), a.pos(i), a.size(i), n, kbuff, kplus ); } + } } /** @@ -2185,9 +2195,13 @@ public class LibMatrixAgg private static void s_uasqkp(SparseBlock a, double[] c, int m, int n, KahanObject kbuff, KahanPlusSq kplusSq, int rl, int ru ) { - for (int i=rl; i<ru; i++) { - if (!a.isEmpty(i)) { - sumSq(a.values(i), a.pos(i), a.size(i), kbuff, kplusSq); + if( a.isContiguous() ) { + sumSq(a.values(rl), a.pos(rl), (int)a.size(rl, ru), kbuff, kplusSq); + } + else { + for (int i=rl; i<ru; i++) { + if (!a.isEmpty(i)) + sumSq(a.values(i), a.pos(i), a.size(i), kbuff, kplusSq); } } c[0] = kbuff._sum; @@ -2244,9 +2258,13 @@ public class LibMatrixAgg KahanPlusSq kplusSq, int rl, int ru ) { //compute column aggregates - for (int i=rl; i<ru; i++) { - if (!a.isEmpty(i)) { - sumSqAgg(a.values(i), c, a.indexes(i), a.pos(i), a.size(i), n, kbuff, kplusSq); + if( a.isContiguous() ) { + sumSqAgg(a.values(rl), c, a.indexes(rl), a.pos(rl), (int)a.size(rl, ru), n, kbuff, kplusSq); + } + else { + for (int i=rl; i<ru; i++) { + if (!a.isEmpty(i)) + sumSqAgg(a.values(i), c, a.indexes(i), a.pos(i), a.size(i), n, kbuff, kplusSq); } } } @@ -2392,17 +2410,26 @@ public class LibMatrixAgg private static void s_uamxx( SparseBlock a, double[] c, int m, int n, double init, Builtin builtin, int rl, int ru ) { double ret = init; //keep init val - for( int i=rl; i<ru; i++ ) - { - if( !a.isEmpty(i) ) { - double lval = builtin(a.values(i), a.pos(i), init, a.size(i), builtin); - ret = builtin.execute2(ret, lval); - } + if( a.isContiguous() ) { + int alen = (int) a.size(rl, ru); + double val = builtin(a.values(rl), a.pos(rl), init, alen, builtin); + ret = builtin.execute2(ret, val); //correction (not sparse-safe) - if( a.size(i) < n ) - ret = builtin.execute2(ret, 0); + ret = (alen<(ru-rl)*n) ? builtin.execute2(ret, 0) : ret; + } + else { + for( int i=rl; i<ru; i++ ) { + if( !a.isEmpty(i) ) { + double lval = builtin(a.values(i), a.pos(i), init, a.size(i), builtin); + ret = builtin.execute2(ret, lval); + } + //correction (not sparse-safe) + if( a.size(i) < n ) + ret = builtin.execute2(ret, 0); + } } + c[0] = ret; } @@ -2451,15 +2478,21 @@ public class LibMatrixAgg int[] cnt = new int[ n ]; //compute column aggregates min/max - for( int i=rl; i<ru; i++ ) - { - if( !a.isEmpty(i) ) { - int apos = a.pos(i); - int alen = a.size(i); - double[] avals = a.values(i); - int[] aix = a.indexes(i); - builtinAgg( avals, c, aix, apos, alen, builtin ); - countAgg( avals, cnt, aix, apos, alen ); + if( a.isContiguous() ) { + int alen = (int) a.size(rl, ru); + builtinAgg( a.values(rl), c, a.indexes(rl), a.pos(rl), alen, builtin ); + countAgg( a.values(rl), cnt, a.indexes(rl), a.pos(rl), alen ); + } + else { + for( int i=rl; i<ru; i++ ) { + if( !a.isEmpty(i) ) { + int apos = a.pos(i); + int alen = a.size(i); + double[] avals = a.values(i); + int[] aix = a.indexes(i); + builtinAgg( avals, c, aix, apos, alen, builtin ); + countAgg( avals, cnt, aix, apos, alen ); + } } } @@ -2577,22 +2610,24 @@ public class LibMatrixAgg //correction remaining tuples (not sparse-safe) //note: before aggregate computation in order to //exploit 0 sum (noop) and better numerical stability - for( int i=rl; i<ru; i++ ) - count += (a.isEmpty(i)) ? n : n-a.size(i); + count += (ru-rl)*n - a.size(rl, ru); //compute aggregate mean - for( int i=rl; i<ru; i++ ) - { - if( !a.isEmpty(i) ) { - int alen = a.size(i); - mean(a.values(i), a.pos(i), alen, count, kbuff, kmean); - count += alen; + if( a.isContiguous() ) { + int alen = (int) a.size(rl, ru); + mean(a.values(rl), a.pos(rl), alen, count, kbuff, kmean); + count += alen; + } + else { + for( int i=rl; i<ru; i++ ) { + if( !a.isEmpty(i) ) { + int alen = a.size(i); + mean(a.values(i), a.pos(i), alen, count, kbuff, kmean); + count += alen; + } } } - //OLD VERSION: correction remaining tuples (not sparse-safe) - //mean(0, len-count, count, kbuff, kplus); - c[0] = kbuff._sum; c[1] = len; c[2] = kbuff._correction; @@ -2622,10 +2657,6 @@ public class LibMatrixAgg mean(a.values(i), a.pos(i), a.size(i), count, kbuff, kmean); } - //OLD VERSION: correction remaining tuples (not sparse-safe) - //int count = ((arow==null) ? 0 : arow.size()); - //mean(0, n-count, count, kbuff, kplus); - c[cix+0] = kbuff._sum; c[cix+1] = n; c[cix+2] = kbuff._correction; @@ -2649,18 +2680,24 @@ public class LibMatrixAgg //note: before aggregate computation in order to //exploit 0 sum (noop) and better numerical stability Arrays.fill(c, n, n*2, ru-rl); - for( int i=rl; i<ru; i++ ) - { - if( !a.isEmpty(i) ) { - countDisAgg( a.values(i), c, a.indexes(i), a.pos(i), n, a.size(i) ); + if( a.isContiguous() ) { + countDisAgg( a.values(rl), c, a.indexes(rl), a.pos(rl), n, (int)a.size(rl, ru) ); + } + else { + for( int i=rl; i<ru; i++ ) { + if( !a.isEmpty(i) ) + countDisAgg( a.values(i), c, a.indexes(i), a.pos(i), n, a.size(i) ); } - } + } //compute column aggregate means - for( int i=rl; i<ru; i++ ) - { - if( !a.isEmpty(i) ) { - meanAgg( a.values(i), c, a.indexes(i), a.pos(i), a.size(i), n, kbuff, kmean ); + if( a.isContiguous() ) { + meanAgg( a.values(rl), c, a.indexes(rl), a.pos(rl), (int)a.size(rl, ru), n, kbuff, kmean ); + } + else { + for( int i=rl; i<ru; i++ ) { + if( !a.isEmpty(i) ) + meanAgg( a.values(i), c, a.indexes(i), a.pos(i), a.size(i), n, kbuff, kmean ); } } } @@ -2684,15 +2721,18 @@ public class LibMatrixAgg int rl, int ru) throws DMLRuntimeException { // compute and store count of empty cells before aggregation - int count = 0; - for (int i=rl; i<ru; i++) - count += (a.isEmpty(i)) ? n : n-a.size(i); + int count = (ru-rl)*n - (int)a.size(rl, ru); cbuff.w = count; // calculate aggregated variance (only using non-empty cells) - for (int i=rl; i<ru; i++) { - if (!a.isEmpty(i)) - var(a.values(i), a.pos(i), a.size(i), cbuff, cm); + if( a.isContiguous() ) { + var(a.values(rl), a.pos(rl), (int)a.size(rl, ru), cbuff, cm); + } + else { + for (int i=rl; i<ru; i++) { + if (!a.isEmpty(i)) + var(a.values(i), a.pos(i), a.size(i), cbuff, cm); + } } // store results: { var | mean, count, m2 correction, mean correction } @@ -2766,17 +2806,24 @@ public class LibMatrixAgg // - first, store total possible column counts in 3rd row of output Arrays.fill(c, n*2, n*3, ru-rl); // counts stored in 3rd row // - then subtract one from the column count for each dense value in the column - for (int i=rl; i<ru; i++) { - if (!a.isEmpty(i)) { - // counts stored in 3rd row - countDisAgg(a.values(i), c, a.indexes(i), a.pos(i), n*2, a.size(i)); + if( a.isContiguous() ) { + countDisAgg(a.values(rl), c, a.indexes(rl), a.pos(rl), n*2, (int)a.size(rl, ru)); + } + else { + for (int i=rl; i<ru; i++) { + if (!a.isEmpty(i)) // counts stored in 3rd row + countDisAgg(a.values(i), c, a.indexes(i), a.pos(i), n*2, a.size(i)); } } // calculate aggregated variance for each column - for (int i=rl; i<ru; i++) { - if (!a.isEmpty(i)) { - varAgg(a.values(i), c, a.indexes(i), a.pos(i), a.size(i), n, cbuff, cm); + if( a.isContiguous() ) { + varAgg(a.values(rl), c, a.indexes(rl), a.pos(rl), (int)a.size(rl, ru), n, cbuff, cm); + } + else { + for (int i=rl; i<ru; i++) { + if (!a.isEmpty(i)) + varAgg(a.values(i), c, a.indexes(i), a.pos(i), a.size(i), n, cbuff, cm); } } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b292834/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java index e340f5d..0786b87 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java @@ -99,6 +99,15 @@ public abstract class SparseBlock implements Serializable public abstract boolean isThreadSafe(); /** + * Indicates if the underlying data structures returned by values + * and indexes are contiguous arrays, which can be exploited for + * more efficient operations. + * + * @return + */ + public abstract boolean isContiguous(); + + /** * Clears the sparse block by deleting non-zero values. After this call * all size() calls are guaranteed to return 0. */ http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b292834/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java index 5643850..7173946 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java @@ -176,6 +176,11 @@ public class SparseBlockCOO extends SparseBlock return false; } + @Override + public boolean isContiguous() { + return true; + } + @Override public void reset() { _size = 0; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b292834/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java index c599753..7a447bd 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java @@ -178,6 +178,11 @@ public class SparseBlockCSR extends SparseBlock return false; } + @Override + public boolean isContiguous() { + return true; + } + @Override public void reset() { _size = 0; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b292834/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java index 6e1cded..dfddf47 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java @@ -137,6 +137,11 @@ public class SparseBlockMCSR extends SparseBlock public boolean isThreadSafe() { return true; } + + @Override + public boolean isContiguous() { + return false; + } @Override public void reset() {
