Repository: systemml Updated Branches: refs/heads/master f7078c292 -> 4b6b8d2df
[SYSTEMML-2247] Fix missing block merge large dense blocks >16GB This patch adds the missing support for large (i.e., partitioned) dense blocks >16GB to the primitive for merging matrix blocks as used during various distributed operations parfor result merge, where might work with very large result variables. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/4b6b8d2d Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/4b6b8d2d Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/4b6b8d2d Branch: refs/heads/master Commit: 4b6b8d2df3510f51190b8550fbd7f603b248321d Parents: f7078c2 Author: Matthias Boehm <[email protected]> Authored: Tue May 22 13:48:57 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Tue May 22 14:44:51 2018 -0700 ---------------------------------------------------------------------- .../sysml/runtime/matrix/data/MatrixBlock.java | 107 ++++++++++--------- 1 file changed, 58 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/4b6b8d2d/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index 61a1086..78f4fde 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -1617,77 +1617,86 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab nonZeros = nnz; } - private void mergeIntoDense(MatrixBlock that) - { + private void mergeIntoDense(MatrixBlock that) { + DenseBlock a = getDenseBlock(); if( that.sparse ) { //DENSE <- SPARSE - double[] a = getDenseBlockValues(); SparseBlock b = that.sparseBlock; int m = rlen; - int n = clen; - for( int i=0, aix=0; i<m; i++, aix+=n ) { + for( int i=0; i<m; i++ ) { if( b.isEmpty(i) ) continue; int bpos = b.pos(i); int blen = b.size(i); int[] bix = b.indexes(i); - double[] bval = b.values(i); - for( int j=bpos; j<bpos+blen; j++ ) - if( bval[j] != 0 ) - a[ aix + bix[j] ] = bval[j]; + double[] avals = a.values(i); + double[] bvals = b.values(i); + int aix = a.pos(i); + for( int j=bpos; j<bpos+blen; j++ ) { + double bval = bvals[j]; + if( bval != 0 ) + avals[aix+bix[j]] = bval; + } } } else { //DENSE <- DENSE - double[] a = getDenseBlockValues(); - double[] b = that.getDenseBlockValues(); - int len = rlen * clen; - for( int i=0; i<len; i++ ) - a[i] = ( b[i] != 0 ) ? b[i] : a[i]; + DenseBlock b = that.getDenseBlock(); + for(int bi=0; bi<a.numBlocks(); bi++) { + double[] avals = a.valuesAt(bi); + double[] bvals = b.valuesAt(bi); + int blen = a.size(bi); + for( int j=0; j<blen; j++ ) + avals[j] = bvals[j]!=0 ? bvals[j] : avals[j]; + } } } - private void mergeIntoDensePar(MatrixBlock that) - { + private void mergeIntoDensePar(MatrixBlock that) { + DenseBlock a = getDenseBlock(); if( that.sparse ) { //DENSE <- SPARSE - double[] a = getDenseBlockValues(); SparseBlock b = that.sparseBlock; - IntStream.range(0, rlen).parallel().forEach(i -> { - if( b.isEmpty(i) ) return; - int aix = i*clen; - int bpos = b.pos(i); - int blen = b.size(i); - int[] bix = b.indexes(i); - double[] bval = b.values(i); - for( int j=bpos; j<bpos+blen; j++ ) - if( bval[j] != 0 ) - a[ aix + bix[j] ] = bval[j]; - }); + int roff = 0; //row offset + for( int bi=0; bi<a.numBlocks(); bi++ ) { + double[] avals = a.valuesAt(bi); + int alen = a.blockSize(bi); + final int lroff = roff; //final for lambda + IntStream.range(lroff, lroff+alen).parallel().forEach(i -> { + if( b.isEmpty(i) ) return; + int aix = (i-lroff)*clen; + int bpos = b.pos(i); + int blen = b.size(i); + int[] bix = b.indexes(i); + double[] bval = b.values(i); + for( int j=bpos; j<bpos+blen; j++ ) + if( bval[j] != 0 ) + avals[aix+bix[j]] = bval[j]; + }); + roff += alen; + } } else { //DENSE <- DENSE - double[] a = getDenseBlockValues(); - double[] b = that.getDenseBlockValues(); - Arrays.parallelSetAll(a, i -> (b[i]!=0) ? b[i] : a[i]); + DenseBlock b = that.getDenseBlock(); + for(int bi=0; bi<a.numBlocks(); bi++) { + double[] avals = a.valuesAt(bi); + double[] bvals = b.valuesAt(bi); + Arrays.parallelSetAll(avals, + i -> (bvals[i]!=0) ? bvals[i] : avals[i]); + } } } - private void mergeIntoSparse(MatrixBlock that, boolean appendOnly) - { + private void mergeIntoSparse(MatrixBlock that, boolean appendOnly) { SparseBlock a = sparseBlock; final boolean COO = (a instanceof SparseBlockCOO); final int m = rlen; final int n = clen; - - if( that.sparse ) //SPARSE <- SPARSE - { + if( that.sparse ) { //SPARSE <- SPARSE SparseBlock b = that.sparseBlock; - - for( int i=0; i<m; i++ ) - { + for( int i=0; i<m; i++ ) { if( b.isEmpty(i) ) continue; if( !COO && a.isEmpty(i) ) { //copy entire sparse row (no sort required) a.set(i, b.get(i), true); } - else - { + else { boolean appended = false; int bpos = b.pos(i); int blen = b.size(i); @@ -1705,15 +1714,16 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab } } } - else //SPARSE <- DENSE - { - double[] b = that.getDenseBlockValues(); - - for( int i=0, bix=0; i<m; i++, bix+=n ) { + else { //SPARSE <- DENSE + DenseBlock b = that.getDenseBlock(); + for( int i=0; i<m; i++ ) { + double[] bvals = b.values(i); + int bix = b.pos(i); boolean appended = false; for( int j=0; j<n; j++ ) { - if( b[bix+j] != 0 ) { - appendValue(i, j, b[bix+j]); //incl alloc + double bval = bvals[bix+j]; + if( bval != 0 ) { + appendValue(i, j, bval); //incl alloc appended = true; } } @@ -1722,7 +1732,6 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab a.sort(i); } } - //full sort of coordinate blocks if( COO && !appendOnly ) a.sort();
