[SYSTEMML-821] Multi-threaded matrix block decompression, tests This patch adds multi-threaded decompression of compressed matrix blocks. Our approach is to work on row partitions of compressed column groups, in order to enable parallel sparse row allocation, unsynchronized row appends, and potential sorting in case of sparse. On the sparse Imagenet, runtime improved from 101s to 29s as it is still dominated by result allocation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/bcd71d34 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/bcd71d34 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/bcd71d34 Branch: refs/heads/master Commit: bcd71d3468db7e3e87a8636ac24dbbb8cc4817d3 Parents: 2455b65 Author: Matthias Boehm <[email protected]> Authored: Thu Aug 11 22:37:11 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Aug 12 09:49:59 2016 -0700 ---------------------------------------------------------------------- .../apache/sysml/runtime/compress/ColGroup.java | 6 +- .../sysml/runtime/compress/ColGroupBitmap.java | 12 ++- .../sysml/runtime/compress/ColGroupOLE.java | 50 +++++----- .../sysml/runtime/compress/ColGroupRLE.java | 32 ++++--- .../runtime/compress/ColGroupUncompressed.java | 10 +- .../runtime/compress/CompressedMatrixBlock.java | 98 +++++++++++++++++++- .../sysml/runtime/matrix/data/MatrixBlock.java | 16 +++- .../functions/compress/ParCompressionTest.java | 9 +- 8 files changed, 180 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bcd71d34/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java index 200e12a..a79f660 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java @@ -152,7 +152,7 @@ public abstract class ColGroup implements Serializable * a matrix block where the columns covered by this column group * have not yet been filled in. */ - public abstract void decompressToBlock(MatrixBlock target); + public abstract void decompressToBlock(MatrixBlock target, int rl, int ru); /** * Decompress the contents of this column group into uncompressed packed @@ -263,6 +263,8 @@ public abstract class ColGroup implements Serializable /** * * @param rnnz + * @param rl row lower bound, inclusive + * @param ru row upper bound, exclusive */ - protected abstract void countNonZerosPerRow(int[] rnnz); + protected abstract void countNonZerosPerRow(int[] rnnz, int rl, int ru); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bcd71d34/src/main/java/org/apache/sysml/runtime/compress/ColGroupBitmap.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupBitmap.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupBitmap.java index 005673c..5bd3e52 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupBitmap.java +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupBitmap.java @@ -202,7 +202,7 @@ public abstract class ColGroupBitmap extends ColGroup //generic decompression for OLE/RLE, to be overwritten for performance @Override - public void decompressToBlock(MatrixBlock target) + public void decompressToBlock(MatrixBlock target, int rl, int ru) { final int numCols = getNumCols(); final int numVals = getNumValues(); @@ -215,9 +215,11 @@ public abstract class ColGroupBitmap extends ColGroup while (decoder.hasNext()) { int row = decoder.next(); - for (int colIx = 0; colIx < numCols; colIx++) { + if( row<rl ) continue; + if( row>ru ) break; + + for (int colIx = 0; colIx < numCols; colIx++) target.appendValue(row, colIndices[colIx], _values[valOff+colIx]); - } } } } @@ -432,7 +434,9 @@ public abstract class ColGroupBitmap extends ColGroup * valid until the next call to this method. May be reused across * calls. */ - public abstract Iterator<Integer> getDecodeIterator(int bmpIx); + public abstract Iterator<Integer> getDecodeIterator(int k); + + //TODO getDecodeIterator(int k, int rl, int ru) /** * Utility function of sparse-unsafe operations. http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bcd71d34/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java index 81d57dc..f648dee 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java @@ -105,25 +105,24 @@ public class ColGroupOLE extends ColGroupBitmap } @Override - public Iterator<Integer> getDecodeIterator(int bmpIx) { - return new BitmapDecoderOLE(_data, _ptr[bmpIx], len(bmpIx)); + public Iterator<Integer> getDecodeIterator(int k) { + return new BitmapDecoderOLE(_data, _ptr[k], len(k)); } @Override - public void decompressToBlock(MatrixBlock target) + public void decompressToBlock(MatrixBlock target, int rl, int ru) { if( LOW_LEVEL_OPT && getNumValues() > 1 ) { final int blksz = BitmapEncoder.BITMAP_BLOCK_SZ; final int numCols = getNumCols(); final int numVals = getNumValues(); - final int n = getNumRows(); //cache blocking config and position array - int[] apos = new int[numVals]; + int[] apos = skipScan(numVals, rl); //cache conscious append via horizontal scans - for( int bi=0; bi<n; bi+=blksz ) { + for( int bi=rl; bi<ru; bi+=blksz ) { for (int k = 0, off=0; k < numVals; k++, off+=numCols) { int boff = _ptr[k]; int blen = len(k); @@ -143,7 +142,7 @@ public class ColGroupOLE extends ColGroupBitmap else { //call generic decompression with decoder - super.decompressToBlock(target); + super.decompressToBlock(target, rl, ru); } } @@ -581,27 +580,36 @@ public class ColGroupOLE extends ColGroupBitmap } @Override - protected void countNonZerosPerRow(int[] rnnz) + protected void countNonZerosPerRow(int[] rnnz, int rl, int ru) { final int blksz = BitmapEncoder.BITMAP_BLOCK_SZ; + final int blksz2 = ColGroupBitmap.WRITE_CACHE_BLKSZ; final int numVals = getNumValues(); final int numCols = getNumCols(); - //iterate over all values and their bitmaps - for (int k = 0; k < numVals; k++) - { - //prepare value-to-add for entire value bitmap - int boff = _ptr[k]; - int blen = len(k); + //current pos per OLs / output values + int[] apos = skipScan(numVals, rl); + + + //cache conscious count via horizontal scans + for( int bi=rl; bi<ru; bi+=blksz2 ) { + int bimax = Math.min(bi+blksz2, ru); - //iterate over bitmap blocks and add values - int off = 0; - int slen; - for (int bix=0; bix<blen; bix+=slen+1, off+=blksz) { - slen = _data[boff+bix]; - for (int blckIx = 1; blckIx <= slen; blckIx++) { - rnnz[off + _data[boff+bix + blckIx]] += numCols; + //iterate over all values and their bitmaps + for (int k = 0; k < numVals; k++) { + //prepare value-to-add for entire value bitmap + int boff = _ptr[k]; + int blen = len(k); + int bix = apos[k]; + + //iterate over bitmap blocks and add values + for( int off=bi, slen=0; bix<blen && off<bimax; bix+=slen+1, off+=blksz ) { + slen = _data[boff+bix]; + for (int blckIx = 1; blckIx <= slen; blckIx++) + rnnz[off + _data[boff+bix + blckIx] - bi] += numCols; } + + apos[k] = bix; } } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bcd71d34/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java index 017a7d3..00b0ef9 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java @@ -84,27 +84,26 @@ public class ColGroupRLE extends ColGroupBitmap } @Override - public Iterator<Integer> getDecodeIterator(int bmpIx) { - return new BitmapDecoderRLE(_data, _ptr[bmpIx], len(bmpIx)); + public Iterator<Integer> getDecodeIterator(int k) { + return new BitmapDecoderRLE(_data, _ptr[k], len(k)); } @Override - public void decompressToBlock(MatrixBlock target) + public void decompressToBlock(MatrixBlock target, int rl, int ru) { if( LOW_LEVEL_OPT && getNumValues() > 1 ) { final int blksz = 128 * 1024; final int numCols = getNumCols(); final int numVals = getNumValues(); - final int n = getNumRows(); //position and start offset arrays - int[] apos = new int[numVals]; int[] astart = new int[numVals]; + int[] apos = skipScan(numVals, rl, astart); //cache conscious append via horizontal scans - for( int bi=0; bi<n; bi+=blksz ) { - int bimax = Math.min(bi+blksz, n); + for( int bi=rl; bi<ru; bi+=blksz ) { + int bimax = Math.min(bi+blksz, ru); for (int k=0, off=0; k < numVals; k++, off+=numCols) { int boff = _ptr[k]; int blen = len(k); @@ -113,7 +112,7 @@ public class ColGroupRLE extends ColGroupBitmap for( ; bix<blen & start<bimax; bix+=2) { start += _data[boff + bix]; int len = _data[boff + bix+1]; - for( int i=start; i<start+len; i++ ) + for( int i=Math.max(rl,start); i<Math.min(start+len,ru); i++ ) for( int j=0; j<numCols; j++ ) if( _values[off+j]!=0 ) target.appendValue(i, _colIndexes[j], _values[off+j]); @@ -127,7 +126,7 @@ public class ColGroupRLE extends ColGroupBitmap else { //call generic decompression with decoder - super.decompressToBlock(target); + super.decompressToBlock(target, rl, ru); } } @@ -571,22 +570,27 @@ public class ColGroupRLE extends ColGroupBitmap } @Override - protected void countNonZerosPerRow(int[] rnnz) + protected void countNonZerosPerRow(int[] rnnz, int rl, int ru) { final int numVals = getNumValues(); final int numCols = getNumCols(); + //current pos / values per RLE list + int[] astart = new int[numVals]; + int[] apos = skipScan(numVals, rl, astart); + for (int k = 0; k < numVals; k++) { int boff = _ptr[k]; int blen = len(k); - + int bix = apos[k]; + int curRunStartOff = 0; int curRunEnd = 0; - for (int bix = 0; bix < blen; bix+=2) { + for( ; bix < blen && curRunStartOff<ru; bix+=2) { curRunStartOff = curRunEnd + _data[boff+bix]; curRunEnd = curRunStartOff + _data[boff+bix + 1]; - for( int i=curRunStartOff; i<curRunEnd; i++ ) - rnnz[i] += numCols; + for( int i=Math.max(curRunStartOff,rl); i<Math.min(curRunEnd, ru); i++ ) + rnnz[i-rl] += numCols; } } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bcd71d34/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java index 43bfa72..0ac82aa 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java @@ -204,11 +204,11 @@ public class ColGroupUncompressed extends ColGroup } @Override - public void decompressToBlock(MatrixBlock target) { + public void decompressToBlock(MatrixBlock target, int rl, int ru) { //empty block, nothing to add to output if( _data.isEmptyBlock(false) ) return; - for (int row = 0; row < _data.getNumRows(); row++) { + for (int row = rl; row < ru; row++) { for (int colIx = 0; colIx < _colIndexes.length; colIx++) { int col = _colIndexes[colIx]; double cellVal = _data.quickGetValue(row, colIx); @@ -363,9 +363,9 @@ public class ColGroupUncompressed extends ColGroup } @Override - protected void countNonZerosPerRow(int[] rnnz) + protected void countNonZerosPerRow(int[] rnnz, int rl, int ru) { - for( int i=0; i<_data.getNumRows(); i++ ) - rnnz[i] += _data.recomputeNonZeros(i, i, 0, _data.getNumColumns()-1); + for( int i=rl; i<ru; i++ ) + rnnz[i-rl] += _data.recomputeNonZeros(i, i, 0, _data.getNumColumns()-1); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bcd71d34/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java index 6e2bca8..3913721 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java @@ -523,12 +523,14 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable if( !isCompressed() ) return new MatrixBlock(this); + Timing time = new Timing(true); + //preallocation sparse rows to avoid repeated reallocations MatrixBlock ret = new MatrixBlock(getNumRows(), getNumColumns(), isInSparseFormat(), getNonZeros()); if( ret.isInSparseFormat() ) { int[] rnnz = new int[rlen]; for (ColGroup grp : _colGroups) - grp.countNonZerosPerRow(rnnz); + grp.countNonZerosPerRow(rnnz, 0, rlen); ret.allocateSparseRowsBlock(); SparseBlock rows = ret.getSparseBlock(); for( int i=0; i<rlen; i++ ) @@ -537,16 +539,67 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable //core decompression (append if sparse) for (ColGroup grp : _colGroups) - grp.decompressToBlock(ret); + grp.decompressToBlock(ret, 0, rlen); //post-processing (for append in decompress) - if( isInSparseFormat() ) + ret.setNonZeros(nonZeros); + if( ret.isInSparseFormat() ) ret.sortSparseRows(); + if( LOG.isDebugEnabled() ) + LOG.debug("decompressed block in "+time.stop()+"ms."); + return ret; } /** + * @param k degree of parallelism + * @return a new uncompressed matrix block containing the contents + * of this block + * @throws DMLRuntimeException + */ + public MatrixBlock decompress(int k) throws DMLRuntimeException + { + //early abort for not yet compressed blocks + if( !isCompressed() ) + return new MatrixBlock(this); + if( k <= 1 ) + return decompress(); + + Timing time = new Timing(true); + + MatrixBlock ret = new MatrixBlock(rlen, clen, sparse, nonZeros); + ret.allocateDenseOrSparseBlock(); + + //multi-threaded decompression + try { + ExecutorService pool = Executors.newFixedThreadPool( k ); + int rlen = getNumRows(); + int seqsz = BitmapEncoder.BITMAP_BLOCK_SZ; + int blklen = (int)(Math.ceil((double)rlen/k)); + blklen += (blklen%seqsz != 0)?seqsz-blklen%seqsz:0; + ArrayList<DecompressTask> tasks = new ArrayList<DecompressTask>(); + for( int i=0; i<k & i*blklen<getNumRows(); i++ ) + tasks.add(new DecompressTask(_colGroups, ret, i*blklen, Math.min((i+1)*blklen,rlen))); + List<Future<Object>> rtasks = pool.invokeAll(tasks); + pool.shutdown(); + for( Future<Object> rt : rtasks ) + rt.get(); //error handling + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + + //post-processing + ret.setNonZeros(nonZeros); + + if( LOG.isDebugEnabled() ) + LOG.debug("decompressed block w/ k="+k+" in "+time.stop()+"ms."); + + return ret; + } + + /** * * @return an upper bound on the memory used to store this compressed block * considering class overhead. @@ -1522,6 +1575,45 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable } } + private static class DecompressTask implements Callable<Object> + { + private List<ColGroup> _colGroups = null; + private MatrixBlock _ret = null; + private int _rl = -1; + private int _ru = -1; + + protected DecompressTask( List<ColGroup> colGroups, MatrixBlock ret, int rl, int ru ) { + _colGroups = colGroups; + _ret = ret; + _rl = rl; + _ru = ru; + } + + @Override + public Object call() throws DMLRuntimeException { + + //preallocate sparse rows to avoid repeated alloc + if( _ret.isInSparseFormat() ) { + int[] rnnz = new int[_ru-_rl]; + for (ColGroup grp : _colGroups) + grp.countNonZerosPerRow(rnnz, _rl, _ru); + SparseBlock rows = _ret.getSparseBlock(); + for( int i=_rl; i<_ru; i++ ) + rows.allocate(i, rnnz[i-_rl]); + } + + //decompress row partition + for (ColGroup grp : _colGroups) + grp.decompressToBlock(_ret, _rl, _ru); + + //post processing (sort due to append) + if( _ret.isInSparseFormat() ) + _ret.sortSparseRows(_rl, _ru); + + return null; + } + } + ////////////////////////////////////////// // Graceful fallback to uncompressed linear algebra http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bcd71d34/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index 5884768..8f84bd7 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -886,7 +886,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab } /** - * + * Sorts all existing sparse rows by column indexes. */ public void sortSparseRows() { if( !sparse || sparseBlock==null ) @@ -895,6 +895,20 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab } /** + * Sorts all existing sparse rows in range [rl,ru) by + * column indexes. + * + * @param rl row lower bound, inclusive + * @param ru row upper bound, exclusive + */ + public void sortSparseRows(int rl, int ru) { + if( !sparse || sparseBlock==null ) + return; + for( int i=rl; i<ru; i++ ) + sparseBlock.sort(i); + } + + /** * Utility function for computing the min non-zero value. * * @return http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bcd71d34/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java index e0fe847..603584c 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java @@ -19,6 +19,7 @@ package org.apache.sysml.test.integration.functions.compress; +import org.apache.sysml.runtime.compress.BitmapEncoder; import org.apache.sysml.runtime.compress.CompressedMatrixBlock; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.matrix.data.MatrixBlock; @@ -32,7 +33,8 @@ import org.junit.Test; */ public class ParCompressionTest extends AutomatedTestBase { - private static final int rows = 1023; + //large enough for multiple parallel tasks and mis-aligned + private static final int rows = 3 * BitmapEncoder.BITMAP_BLOCK_SZ + 7; private static final int cols = 20; private static final double sparsity1 = 0.9; private static final double sparsity2 = 0.1; @@ -143,6 +145,7 @@ public class ParCompressionTest extends AutomatedTestBase } //generate input data + int k = InfrastructureAnalyzer.getLocalParallelism(); double min = (vtype==ValueType.CONST)? 10 : -10; double[][] input = TestUtils.generateTestMatrix(rows, cols, min, 10, sparsity, 7); if( vtype==ValueType.RAND_ROUND ) @@ -152,10 +155,10 @@ public class ParCompressionTest extends AutomatedTestBase //compress given matrix block CompressedMatrixBlock cmb = new CompressedMatrixBlock(mb); if( compress ) - cmb.compress(InfrastructureAnalyzer.getLocalParallelism()); + cmb.compress(k); //decompress the compressed matrix block - MatrixBlock tmp = cmb.decompress(); + MatrixBlock tmp = cmb.decompress(k); //compare result with input double[][] d1 = DataConverter.convertToDoubleMatrix(mb);
