[SYSTEMML-2029] Fix sync issue of parallel binary block reader sparse The parallel (i.e., multi-threaded) reader for sparse binary block matrices pre-allocates one sparse row per row of blocks as synchronization object. In special scenarios with shallow/deep copy of sparse rows (if the first block is read into an empty synchronization point), these objects are overwritten leading to lost synchronization points and thus potentially corruption on concurrent updates. We guard against this unwanted overwrite by checking for allocated instead of empty sparse rows.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/aefab8f8 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/aefab8f8 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/aefab8f8 Branch: refs/heads/master Commit: aefab8f8c1ce5c419f02a8f39b457f127499b9a4 Parents: fbec479 Author: Matthias Boehm <[email protected]> Authored: Wed Nov 29 18:53:40 2017 -0800 Committer: Matthias Boehm <[email protected]> Committed: Wed Nov 29 19:11:54 2017 -0800 ---------------------------------------------------------------------- .../sysml/runtime/matrix/data/MatrixBlock.java | 2 +- .../sysml/runtime/matrix/data/SparseBlock.java | 9 ++++ .../runtime/matrix/data/SparseBlockCOO.java | 5 ++ .../runtime/matrix/data/SparseBlockCSR.java | 5 ++ .../runtime/matrix/data/SparseBlockMCSR.java | 43 ++++++++-------- .../runtime/matrix/data/SparseRowVector.java | 52 ++++++-------------- 6 files changed, 57 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/aefab8f8/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index f176c9a..22b20e4 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -738,7 +738,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab int aix = rowoffset+i; //single block append (avoid re-allocations) - if( sparseBlock.isEmpty(aix) && coloffset==0 ) { + if( !sparseBlock.isAllocated(aix) && coloffset==0 ) { //note: the deep copy flag is only relevant for MCSR due to //shallow references of b.get(i); other block formats do not //require a redundant copy because b.get(i) created a new row. http://git-wip-us.apache.org/repos/asf/systemml/blob/aefab8f8/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java index 1ece183..00b59e0 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java @@ -166,6 +166,15 @@ public abstract class SparseBlock implements Serializable } /** + * Indicates if the underlying data structure for a given row + * is already allocated. + * + * @param r row index + * @return true if already allocated + */ + public abstract boolean isAllocated(int r); + + /** * Clears the sparse block by deleting non-zero values. After this call * all size() calls are guaranteed to return 0. */ http://git-wip-us.apache.org/repos/asf/systemml/blob/aefab8f8/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java index 295a545..de43855 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java @@ -186,6 +186,11 @@ public class SparseBlockCOO extends SparseBlock return true; } + @Override + public boolean isAllocated(int r) { + return true; + } + @Override public void reset() { _size = 0; http://git-wip-us.apache.org/repos/asf/systemml/blob/aefab8f8/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java index 19fbe50..228a806 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java @@ -337,6 +337,11 @@ public class SparseBlockCSR extends SparseBlock return true; } + @Override + public boolean isAllocated(int r) { + return true; + } + @Override public void reset() { if( _size > 0 ) { http://git-wip-us.apache.org/repos/asf/systemml/blob/aefab8f8/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java index 2c04865..9aca9e5 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java @@ -47,7 +47,7 @@ public class SparseBlockMCSR extends SparseBlock SparseRow[] orows = ((SparseBlockMCSR)sblock)._rows; _rows = new SparseRow[orows.length]; for( int i=0; i<_rows.length; i++ ) - _rows[i] = new SparseRowVector(orows[i]); + _rows[i] = new SparseRowVector(orows[i]); } //general case SparseBlock else { @@ -81,7 +81,7 @@ public class SparseBlockMCSR extends SparseBlock } } else { - _rows = rows; + _rows = rows; } } @@ -120,24 +120,22 @@ public class SparseBlockMCSR extends SparseBlock @Override public void allocate(int r) { - if( _rows[r] == null ) + if( !isAllocated(r) ) _rows[r] = new SparseRowVector(); } @Override public void allocate(int r, int nnz) { - if( _rows[r] == null ) { + if( !isAllocated(r) ) _rows[r] = (nnz == 1) ? new SparseRowScalar() : new SparseRowVector(nnz); - } } @Override public void allocate(int r, int ennz, int maxnnz) { - if( _rows[r] == null ) { + if( !isAllocated(r) ) _rows[r] = (ennz == 1) ? new SparseRowScalar() : new SparseRowVector(ennz, maxnnz); - } } @Override @@ -154,7 +152,12 @@ public class SparseBlockMCSR extends SparseBlock public boolean isContiguous() { return false; } - + + @Override + public boolean isAllocated(int r) { + return (_rows[r] != null); + } + @Override public void reset() { for( SparseRow row : _rows ) @@ -171,7 +174,7 @@ public class SparseBlockMCSR extends SparseBlock @Override public void reset(int r, int ennz, int maxnnz) { - if( _rows[r] != null ) + if( isAllocated(r) ) _rows[r].reset(ennz, maxnnz); } @@ -188,14 +191,14 @@ public class SparseBlockMCSR extends SparseBlock @Override public int size(int r) { //prior check with isEmpty(r) expected - return (_rows[r]!=null) ? _rows[r].size() : 0; + return isAllocated(r) ? _rows[r].size() : 0; } @Override public long size(int rl, int ru) { int ret = 0; for( int i=rl; i<ru; i++ ) - ret += (_rows[i]!=null) ? _rows[i].size() : 0; + ret += isAllocated(i) ? _rows[i].size() : 0; return ret; } @@ -213,7 +216,7 @@ public class SparseBlockMCSR extends SparseBlock @Override public boolean isEmpty(int r) { - return (_rows[r]==null || _rows[r].isEmpty()); + return (!isAllocated(r) || _rows[r].isEmpty()); } @Override @@ -236,7 +239,7 @@ public class SparseBlockMCSR extends SparseBlock @Override public boolean set(int r, int c, double v) { - if( _rows[r] == null ) + if( !isAllocated(r) ) _rows[r] = new SparseRowScalar(); else if( _rows[r] instanceof SparseRowScalar && !_rows[r].isEmpty()) _rows[r] = new SparseRowVector(_rows[r]); @@ -246,18 +249,18 @@ public class SparseBlockMCSR extends SparseBlock @Override public void set(int r, SparseRow row, boolean deep) { //copy values into existing row to avoid allocation - if( _rows[r] != null && _rows[r] instanceof SparseRowVector + if( isAllocated(r) && _rows[r] instanceof SparseRowVector && ((SparseRowVector)_rows[r]).capacity() >= row.size() && deep ) ((SparseRowVector)_rows[r]).copy(row); //set new sparse row (incl allocation if required) else - _rows[r] = (deep && row != null) ? - new SparseRowVector(row) : row; + _rows[r] = (deep && row != null) ? + new SparseRowVector(row) : row; } @Override public void append(int r, int c, double v) { - if( _rows[r] == null ) + if( !isAllocated(r) ) _rows[r] = new SparseRowScalar(); else if( _rows[r] instanceof SparseRowScalar && !_rows[r].isEmpty() ) _rows[r] = new SparseRowVector(_rows[r]); @@ -266,7 +269,7 @@ public class SparseBlockMCSR extends SparseBlock @Override public void setIndexRange(int r, int cl, int cu, double[] v, int vix, int len) { - if( _rows[r] == null ) + if( !isAllocated(r) ) _rows[r] = new SparseRowVector(); else if( _rows[r] instanceof SparseRowScalar ) _rows[r] = new SparseRowVector(_rows[r]); @@ -298,7 +301,7 @@ public class SparseBlockMCSR extends SparseBlock @Override public double get(int r, int c) { - if( _rows[r] == null ) + if( !isAllocated(r) ) return 0; return _rows[r].get(c); } @@ -346,7 +349,7 @@ public class SparseBlockMCSR extends SparseBlock sb.append(": "); sb.append(_rows[i]); sb.append("\n"); - } + } return sb.toString(); } http://git-wip-us.apache.org/repos/asf/systemml/blob/aefab8f8/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java index 4927906..a73ed1a 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java @@ -54,7 +54,7 @@ public final class SparseRowVector extends SparseRow implements Serializable estimatedNzs = estnnz; maxNzs = maxnnz; int capacity = ((estnnz<initialCapacity && estnnz>0) ? - estnnz : initialCapacity); + estnnz : initialCapacity); values = new double[capacity]; indexes = new int[capacity]; } @@ -163,10 +163,10 @@ public final class SparseRowVector extends SparseRow implements Serializable shiftLeftAndDelete(index); return true; // nnz-- } - else { + else { values[index] = v; return false; - } + } } //early abort on zero (if no overwrite) @@ -201,68 +201,44 @@ public final class SparseRowVector extends SparseRow implements Serializable @Override public double get(int col) { //search for existing col index - int index = Arrays.binarySearch(indexes, 0, size, col); - if( index >= 0 ) - return values[index]; - else - return 0; + int index = Arrays.binarySearch(indexes, 0, size, col); + return (index >= 0) ? values[index] : 0; } public int searchIndexesFirstLTE(int col) { //search for existing col index int index = Arrays.binarySearch(indexes, 0, size, col); - if( index >= 0 ) { - if( index < size ) - return index; - else - return -1; - } + if( index >= 0 ) + return (index < size) ? index : -1; //search lt col index (see binary search) index = Math.abs( index+1 ); - if( index-1 < size ) - return index-1; - else - return -1; + return (index-1 < size) ? index-1 : -1; } public int searchIndexesFirstGTE(int col) { //search for existing col index int index = Arrays.binarySearch(indexes, 0, size, col); - if( index >= 0 ) { - if( index < size ) - return index; - else - return -1; - } + if( index >= 0 ) + return (index < size) ? index : -1; //search gt col index (see binary search) index = Math.abs( index+1 ); - if( index < size ) - return index; - else - return -1; + return (index < size) ? index : -1; } public int searchIndexesFirstGT(int col) { //search for existing col index int index = Arrays.binarySearch(indexes, 0, size, col); - if( index >= 0 ) { - if( index+1 < size ) - return index+1; - else - return -1; - } + if( index >= 0 ) + return (index+1 < size) ? index+1 : -1; //search gt col index (see binary search) index = Math.abs( index+1 ); - if( index < size ) - return index; - else - return -1; + return (index < size) ? index : -1; } public void deleteIndexRange(int lowerCol, int upperCol)
