Repository: incubator-systemml Updated Branches: refs/heads/master 1035699c3 -> 8a05574c8
[SYSTEMML-813] Decompression on unsupported compressed block operations Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b233b599 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b233b599 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b233b599 Branch: refs/heads/master Commit: b233b5998bd4b77876c54f15da311d9c6207760f Parents: 1035699 Author: Matthias Boehm <[email protected]> Authored: Wed Jul 20 21:20:44 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Thu Jul 21 12:54:11 2016 -0700 ---------------------------------------------------------------------- .../runtime/compress/CompressedMatrixBlock.java | 445 ++++++++++++++++++- .../sysml/runtime/matrix/data/MatrixBlock.java | 4 +- 2 files changed, 446 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b233b599/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java index f5b2058..aa21e16 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java @@ -38,6 +38,7 @@ import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.math3.random.Well1024a; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.sysml.hops.OptimizerUtils; @@ -50,22 +51,37 @@ import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo; import org.apache.sysml.runtime.compress.estim.SizeEstimatorFactory; import org.apache.sysml.runtime.compress.utils.ConverterUtils; import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils; +import org.apache.sysml.runtime.controlprogram.caching.CacheBlock; +import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing; import org.apache.sysml.runtime.functionobjects.KahanPlus; import org.apache.sysml.runtime.functionobjects.KahanPlusSq; import org.apache.sysml.runtime.functionobjects.Multiply; import org.apache.sysml.runtime.functionobjects.ReduceRow; +import org.apache.sysml.runtime.instructions.cp.CM_COV_Object; import org.apache.sysml.runtime.instructions.cp.KahanObject; +import org.apache.sysml.runtime.instructions.cp.ScalarObject; +import org.apache.sysml.runtime.matrix.data.CTableMap; import org.apache.sysml.runtime.matrix.data.LibMatrixBincell; import org.apache.sysml.runtime.matrix.data.LibMatrixReorg; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.MatrixValue; +import org.apache.sysml.runtime.matrix.data.RandomMatrixGenerator; import org.apache.sysml.runtime.matrix.data.SparseBlock; +import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator; +import org.apache.sysml.runtime.matrix.operators.AggregateOperator; import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; import org.apache.sysml.runtime.matrix.operators.BinaryOperator; +import org.apache.sysml.runtime.matrix.operators.CMOperator; +import org.apache.sysml.runtime.matrix.operators.COVOperator; +import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.runtime.matrix.operators.QuaternaryOperator; +import org.apache.sysml.runtime.matrix.operators.ReorgOperator; import org.apache.sysml.runtime.matrix.operators.ScalarOperator; +import org.apache.sysml.runtime.matrix.operators.UnaryOperator; +import org.apache.sysml.runtime.util.IndexRange; /** * Experimental version of MatrixBlock that allows a compressed internal @@ -634,7 +650,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable if( !isCompressed() ) { if( that instanceof CompressedMatrixBlock ) that = ((CompressedMatrixBlock) that).decompress(); - return super.appendOperations(that, ret); + return super.appendOperations(that, ret, true); } final int m = rlen; @@ -1339,4 +1355,431 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable return _ret; } } + + ////////////////////////////////////////// + // Graceful fallback to uncompressed linear algebra + + @Override + public MatrixValue unaryOperations(UnaryOperator op, MatrixValue result) + throws DMLRuntimeException { + printDecompressWarning("unaryOperations"); + MatrixBlock tmp = isCompressed() ? decompress() : this; + return tmp.unaryOperations(op, result); + } + + @Override + public void unaryOperationsInPlace(UnaryOperator op) + throws DMLRuntimeException { + printDecompressWarning("unaryOperationsInPlace"); + MatrixBlock tmp = isCompressed() ? decompress() : this; + tmp.unaryOperationsInPlace(op); + } + + @Override + public MatrixValue binaryOperations(BinaryOperator op, MatrixValue thatValue, MatrixValue result) + throws DMLRuntimeException { + printDecompressWarning("binaryOperations", (MatrixBlock)thatValue); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right = getUncompressed(thatValue); + return left.binaryOperations(op, right, result); + } + + @Override + public void binaryOperationsInPlace(BinaryOperator op, MatrixValue thatValue) + throws DMLRuntimeException { + printDecompressWarning("binaryOperationsInPlace", (MatrixBlock)thatValue); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right = getUncompressed(thatValue); + left.binaryOperationsInPlace(op, right); + } + + @Override + public void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, MatrixValue newWithCorrection) + throws DMLRuntimeException { + throw new DMLRuntimeException("CompressedMatrixBlock: incrementalAggregate not supported."); + } + + @Override + public void incrementalAggregate(AggregateOperator aggOp, MatrixValue newWithCorrection) + throws DMLRuntimeException { + throw new DMLRuntimeException("CompressedMatrixBlock: incrementalAggregate not supported."); + } + + @Override + public MatrixValue reorgOperations(ReorgOperator op, MatrixValue ret, int startRow, int startColumn, int length) + throws DMLRuntimeException { + printDecompressWarning("reorgOperations"); + MatrixBlock tmp = isCompressed() ? decompress() : this; + return tmp.reorgOperations(op, ret, startRow, startColumn, length); + } + + @Override + public MatrixBlock appendOperations(MatrixBlock that, MatrixBlock ret, boolean cbind) + throws DMLRuntimeException { + if( cbind ) //use supported operation + return appendOperations(that, ret); + printDecompressWarning("appendOperations-rbind", that); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right = getUncompressed(that); + return left.appendOperations(right, ret, cbind); + } + + @Override + public void appendOperations(MatrixValue v2, + ArrayList<IndexedMatrixValue> outlist, int blockRowFactor, + int blockColFactor, boolean cbind, boolean m2IsLast, int nextNCol) + throws DMLRuntimeException { + printDecompressWarning("appendOperations", (MatrixBlock)v2); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right = getUncompressed(v2); + left.appendOperations(right, outlist, blockRowFactor, blockColFactor, cbind, m2IsLast, nextNCol); + } + + @Override + public void permutationMatrixMultOperations(MatrixValue m2Val, MatrixValue out1Val, MatrixValue out2Val) + throws DMLRuntimeException { + permutationMatrixMultOperations(m2Val, out1Val, out2Val, 1); + } + + @Override + public void permutationMatrixMultOperations(MatrixValue m2Val, MatrixValue out1Val, MatrixValue out2Val, int k) + throws DMLRuntimeException { + printDecompressWarning("permutationMatrixMultOperations", (MatrixBlock)m2Val); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right = getUncompressed(m2Val); + left.permutationMatrixMultOperations(right, out1Val, out2Val, k); + } + + @Override + public MatrixBlock leftIndexingOperations(MatrixBlock rhsMatrix, int rl, int ru, int cl, int cu, MatrixBlock ret, UpdateType update) + throws DMLRuntimeException { + printDecompressWarning("leftIndexingOperations"); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right = getUncompressed(rhsMatrix); + return left.leftIndexingOperations(right, rl, ru, cl, cu, ret, update); + } + + @Override + public MatrixBlock leftIndexingOperations(ScalarObject scalar, int rl, int cl, MatrixBlock ret, UpdateType update) + throws DMLRuntimeException { + printDecompressWarning("leftIndexingOperations"); + MatrixBlock tmp = isCompressed() ? decompress() : this; + return tmp.leftIndexingOperations(scalar, rl, cl, ret, update); + } + + @Override + public MatrixBlock sliceOperations(int rl, int ru, int cl, int cu, CacheBlock ret) + throws DMLRuntimeException { + printDecompressWarning("sliceOperations"); + MatrixBlock tmp = isCompressed() ? decompress() : this; + return tmp.sliceOperations(rl, ru, cl, cu, ret); + } + + @Override + public void sliceOperations(ArrayList<IndexedMatrixValue> outlist, IndexRange range, + int rowCut, int colCut, int normalBlockRowFactor, + int normalBlockColFactor, int boundaryRlen, int boundaryClen) { + printDecompressWarning("sliceOperations"); + try { + MatrixBlock tmp = isCompressed() ? decompress() : this; + tmp.sliceOperations(outlist, range, rowCut, colCut, normalBlockRowFactor, + normalBlockColFactor, boundaryRlen, boundaryClen); + } + catch(DMLRuntimeException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public MatrixValue zeroOutOperations(MatrixValue result, IndexRange range, boolean complementary) + throws DMLRuntimeException { + printDecompressWarning("zeroOutOperations"); + MatrixBlock tmp = isCompressed() ? decompress() : this; + return tmp.zeroOutOperations(result, range, complementary); + } + + @Override + public MatrixValue aggregateUnaryOperations(AggregateUnaryOperator op, + MatrixValue result, int blockingFactorRow, int blockingFactorCol, + MatrixIndexes indexesIn) throws DMLRuntimeException { + printDecompressWarning("aggregateUnaryOperations"); + MatrixBlock tmp = isCompressed() ? decompress() : this; + return tmp.aggregateUnaryOperations(op, result, blockingFactorRow, blockingFactorCol, indexesIn); + } + + @Override + public CM_COV_Object cmOperations(CMOperator op) throws DMLRuntimeException { + printDecompressWarning("cmOperations"); + MatrixBlock tmp = isCompressed() ? decompress() : this; + return tmp.cmOperations(op); + } + + @Override + public CM_COV_Object cmOperations(CMOperator op, MatrixBlock weights) + throws DMLRuntimeException { + printDecompressWarning("cmOperations"); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right = getUncompressed(weights); + return left.cmOperations(op, right); + } + + @Override + public CM_COV_Object covOperations(COVOperator op, MatrixBlock that) + throws DMLRuntimeException { + printDecompressWarning("covOperations"); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right = getUncompressed(that); + return left.covOperations(op, right); + } + + @Override + public CM_COV_Object covOperations(COVOperator op, MatrixBlock that, MatrixBlock weights) + throws DMLRuntimeException { + printDecompressWarning("covOperations"); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right1 = getUncompressed(that); + MatrixBlock right2 = getUncompressed(weights); + return left.covOperations(op, right1, right2); + } + + @Override + public MatrixValue sortOperations(MatrixValue weights, MatrixValue result) + throws DMLRuntimeException { + printDecompressWarning("sortOperations"); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right = getUncompressed(weights); + return left.sortOperations(right, result); + } + + @Override + public MatrixValue aggregateBinaryOperations(MatrixIndexes m1Index, + MatrixValue m1Value, MatrixIndexes m2Index, MatrixValue m2Value, + MatrixValue result, AggregateBinaryOperator op) + throws DMLRuntimeException { + printDecompressWarning("aggregateBinaryOperations"); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right = getUncompressed(m2Value); + return left.aggregateBinaryOperations(m1Index, left, m2Index, right, result, op); + } + + @Override + public ScalarObject aggregateTernaryOperations(MatrixBlock m1, MatrixBlock m2, MatrixBlock m3, AggregateBinaryOperator op) + throws DMLRuntimeException { + printDecompressWarning("aggregateTernaryOperations"); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right1 = getUncompressed(m2); + MatrixBlock right2 = getUncompressed(m3); + return left.aggregateTernaryOperations(left, right1, right2, op); + } + + @Override + public MatrixBlock uaggouterchainOperations(MatrixBlock mbLeft, MatrixBlock mbRight, + MatrixBlock mbOut, BinaryOperator bOp, AggregateUnaryOperator uaggOp) + throws DMLRuntimeException { + printDecompressWarning("uaggouterchainOperations"); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right = getUncompressed(mbRight); + return left.uaggouterchainOperations(left, right, mbOut, bOp, uaggOp); + } + + @Override + public MatrixBlock groupedAggOperations(MatrixValue tgt, MatrixValue wghts, MatrixValue ret, int ngroups, Operator op) + throws DMLRuntimeException { + return groupedAggOperations(tgt, wghts, ret, ngroups, op, 1); + } + + @Override + public MatrixBlock groupedAggOperations(MatrixValue tgt, MatrixValue wghts, + MatrixValue ret, int ngroups, Operator op, int k) + throws DMLRuntimeException { + printDecompressWarning("groupedAggOperations"); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right = getUncompressed(wghts); + return left.groupedAggOperations(left, right, ret, ngroups, op, k); + } + + @Override + public MatrixBlock removeEmptyOperations(MatrixBlock ret, boolean rows, MatrixBlock select) + throws DMLRuntimeException { + printDecompressWarning("removeEmptyOperations"); + MatrixBlock tmp = isCompressed() ? decompress() : this; + return tmp.removeEmptyOperations(ret, rows, select); + } + + @Override + public MatrixBlock removeEmptyOperations(MatrixBlock ret, boolean rows) + throws DMLRuntimeException { + printDecompressWarning("removeEmptyOperations"); + MatrixBlock tmp = isCompressed() ? decompress() : this; + return tmp.removeEmptyOperations(ret, rows); + } + + @Override + public MatrixBlock rexpandOperations(MatrixBlock ret, double max, + boolean rows, boolean cast, boolean ignore) + throws DMLRuntimeException { + printDecompressWarning("rexpandOperations"); + MatrixBlock tmp = isCompressed() ? decompress() : this; + return tmp.rexpandOperations(ret, max, rows, cast, ignore); + } + + @Override + public MatrixValue replaceOperations(MatrixValue result, double pattern, double replacement) + throws DMLRuntimeException { + printDecompressWarning("replaceOperations"); + MatrixBlock tmp = isCompressed() ? decompress() : this; + return tmp.replaceOperations(result, pattern, replacement); + } + + @Override + public void ternaryOperations(Operator op, double scalar, + MatrixValue that, CTableMap resultMap, MatrixBlock resultBlock) + throws DMLRuntimeException { + printDecompressWarning("ternaryOperations"); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right = getUncompressed(that); + left.ternaryOperations(op, scalar, right, resultMap, resultBlock); + } + + @Override + public void ternaryOperations(Operator op, double scalar, + double scalar2, CTableMap resultMap, MatrixBlock resultBlock) + throws DMLRuntimeException { + printDecompressWarning("ternaryOperations"); + MatrixBlock tmp = isCompressed() ? decompress() : this; + tmp.ternaryOperations(op, scalar, scalar2, resultMap, resultBlock); + } + + @Override + public void ternaryOperations(Operator op, MatrixIndexes ix1, + double scalar, boolean left, int brlen, CTableMap resultMap, + MatrixBlock resultBlock) throws DMLRuntimeException { + printDecompressWarning("ternaryOperations"); + MatrixBlock tmp = isCompressed() ? decompress() : this; + tmp.ternaryOperations(op, ix1, scalar, left, brlen, resultMap, resultBlock); + } + + @Override + public void ternaryOperations(Operator op, MatrixValue that, + double scalar, boolean ignoreZeros, CTableMap resultMap, + MatrixBlock resultBlock) throws DMLRuntimeException { + printDecompressWarning("ternaryOperations"); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right = getUncompressed(that); + left.ternaryOperations(op, right, scalar, ignoreZeros, resultMap, resultBlock); + } + + @Override + public void ternaryOperations(Operator op, MatrixValue that, double scalar, MatrixBlock resultBlock) + throws DMLRuntimeException { + printDecompressWarning("ternaryOperations"); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right = getUncompressed(that); + left.ternaryOperations(op, right, scalar, resultBlock); + } + + @Override + public void ternaryOperations(Operator op, MatrixValue that, + MatrixValue that2, CTableMap resultMap) + throws DMLRuntimeException { + printDecompressWarning("ternaryOperations"); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right1 = getUncompressed(that); + MatrixBlock right2 = getUncompressed(that2); + left.ternaryOperations(op, right1, right2, resultMap); + } + + @Override + public void ternaryOperations(Operator op, MatrixValue that, + MatrixValue that2, CTableMap resultMap, MatrixBlock resultBlock) + throws DMLRuntimeException { + printDecompressWarning("ternaryOperations"); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right1 = getUncompressed(that); + MatrixBlock right2 = getUncompressed(that2); + left.ternaryOperations(op, right1, right2, resultMap, resultBlock); + } + + @Override + public MatrixValue quaternaryOperations(QuaternaryOperator qop, + MatrixValue um, MatrixValue vm, MatrixValue wm, MatrixValue out) + throws DMLRuntimeException { + return quaternaryOperations(qop, um, vm, wm, out, 1); + } + + @Override + public MatrixValue quaternaryOperations(QuaternaryOperator qop, MatrixValue um, + MatrixValue vm, MatrixValue wm, MatrixValue out, int k) + throws DMLRuntimeException { + printDecompressWarning("quaternaryOperations"); + MatrixBlock left = isCompressed() ? decompress() : this; + MatrixBlock right1 = getUncompressed(um); + MatrixBlock right2 = getUncompressed(vm); + MatrixBlock right3 = getUncompressed(wm); + return left.quaternaryOperations(qop, right1, right2, right3, out, k); + } + + @Override + public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, + long[] nnzInBlock, Well1024a bigrand, long bSeed) + throws DMLRuntimeException { + throw new RuntimeException("CompressedMatrixBlock: randOperationsInPlace not supported."); + } + + @Override + public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, + long[] nnzInBlock, Well1024a bigrand, long bSeed, int k) + throws DMLRuntimeException { + throw new RuntimeException("CompressedMatrixBlock: randOperationsInPlace not supported."); + } + + @Override + public MatrixBlock seqOperationsInPlace(double from, double to, double incr) + throws DMLRuntimeException { + //output should always be uncompressed + throw new RuntimeException("CompressedMatrixBlock: seqOperationsInPlace not supported."); + } + + /** + * + * @param mb + * @return + */ + private static boolean isCompressed(MatrixBlock mb) { + return (mb instanceof CompressedMatrixBlock && ((CompressedMatrixBlock)mb).isCompressed()); + } + + /** + * + * @param mVal + * @return + * @throws DMLRuntimeException + */ + private static MatrixBlock getUncompressed(MatrixValue mVal) + throws DMLRuntimeException { + return isCompressed((MatrixBlock)mVal) ? + ((CompressedMatrixBlock)mVal).decompress() : + (MatrixBlock)mVal; + } + + /** + * + * @param operation + */ + private void printDecompressWarning(String operation) { + if( isCompressed() ) { + LOG.warn("Operation '"+operation+"' not supported yet - decompressing for ULA operations."); + } + } + + /** + * + * @param operation + * @param m2 + */ + private void printDecompressWarning(String operation, MatrixBlock m2) { + if( isCompressed() || isCompressed(m2) ) { + LOG.warn("Operation '"+operation+"' not supported yet - decompressing for ULA operations."); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b233b599/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index a7654cc..48b0b2d 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -3891,7 +3891,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab * @return * @throws DMLRuntimeException */ - public MatrixBlock leftIndexingOperations(MatrixBlock rhsMatrix, IndexRange ixrange, MatrixBlock ret, UpdateType update) + public final MatrixBlock leftIndexingOperations(MatrixBlock rhsMatrix, IndexRange ixrange, MatrixBlock ret, UpdateType update) throws DMLRuntimeException { return leftIndexingOperations( @@ -4029,7 +4029,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab * @return * @throws DMLRuntimeException */ - public MatrixBlock sliceOperations(IndexRange ixrange, MatrixBlock ret) throws DMLRuntimeException { + public final MatrixBlock sliceOperations(IndexRange ixrange, MatrixBlock ret) throws DMLRuntimeException { return sliceOperations( (int)ixrange.rowStart, (int)ixrange.rowEnd, (int)ixrange.colStart, (int)ixrange.colEnd, ret);
