This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/systemds.git
commit d0895af4b58c2d1838cc6e74e1bc81165f3e2c62 Author: baunsgaard <[email protected]> AuthorDate: Mon Aug 10 18:34:03 2020 +0200 [SYSTEMDS-2614] Compressed Left Matrix Mult - This commit also contains the column mean operation. - Change left multiply to multiply many rows in different - offsets Introduced to argument list for matrixmult to allow different parallelization scheme - multiply un-compress-able columns as well Closes #1016 --- .../apache/sysds/hops/rewrite/ProgramRewriter.java | 1 + .../runtime/compress/CompressedMatrixBlock.java | 359 +++++++++++++++++---- .../compress/CompressedMatrixBlockFactory.java | 6 +- .../sysds/runtime/compress/colgroup/ColGroup.java | 32 +- .../runtime/compress/colgroup/ColGroupDDC.java | 128 +++++--- .../runtime/compress/colgroup/ColGroupOLE.java | 113 ++++++- .../runtime/compress/colgroup/ColGroupOffset.java | 12 - .../runtime/compress/colgroup/ColGroupRLE.java | 97 +++++- .../runtime/compress/colgroup/ColGroupSizes.java | 9 - .../compress/colgroup/ColGroupUncompressed.java | 38 +-- .../runtime/compress/colgroup/ColGroupValue.java | 14 +- .../runtime/compress/utils/LinearAlgebraUtils.java | 2 + .../sysds/runtime/matrix/data/MatrixBlock.java | 7 + .../compress/AbstractCompressedUnaryTests.java | 42 ++- .../component/compress/CompressedTestBase.java | 134 ++++++-- .../test/component/compress/TestConstants.java | 2 +- 16 files changed, 785 insertions(+), 211 deletions(-) diff --git a/src/main/java/org/apache/sysds/hops/rewrite/ProgramRewriter.java b/src/main/java/org/apache/sysds/hops/rewrite/ProgramRewriter.java index a225047..87df183 100644 --- a/src/main/java/org/apache/sysds/hops/rewrite/ProgramRewriter.java +++ b/src/main/java/org/apache/sysds/hops/rewrite/ProgramRewriter.java @@ -52,6 +52,7 @@ public class ProgramRewriter private ArrayList<HopRewriteRule> _dagRuleSet = null; private ArrayList<StatementBlockRewriteRule> _sbRuleSet = null; + public ProgramRewriter() { // by default which is used during initial compile // apply all (static and dynamic) rewrites diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java index 6fef4b5..db2b8ba 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java @@ -45,7 +45,7 @@ import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.compress.colgroup.ColGroup; import org.apache.sysds.runtime.compress.colgroup.ColGroup.CompressionType; import org.apache.sysds.runtime.compress.colgroup.ColGroupConverter; -import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC1; +import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC; import org.apache.sysds.runtime.compress.colgroup.ColGroupIO; import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed; import org.apache.sysds.runtime.compress.colgroup.ColGroupValue; @@ -55,6 +55,7 @@ import org.apache.sysds.runtime.compress.utils.ColumnGroupIterator; import org.apache.sysds.runtime.compress.utils.LinearAlgebraUtils; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType; import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing; +import org.apache.sysds.runtime.data.DenseBlock; import org.apache.sysds.runtime.data.SparseBlock; import org.apache.sysds.runtime.data.SparseRow; import org.apache.sysds.runtime.functionobjects.Builtin; @@ -418,24 +419,28 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { } - protected void binaryMV(MatrixBlock m2, CompressedMatrixBlock ret, BinaryOperator op, BinaryAccessType aType ){ - if(aType == BinaryAccessType.MATRIX_COL_VECTOR){ + protected void binaryMV(MatrixBlock m2, CompressedMatrixBlock ret, BinaryOperator op, BinaryAccessType aType) { + if(aType == BinaryAccessType.MATRIX_COL_VECTOR) { throw new NotImplementedException("Binary Matrix Col Vector operations are not implemented CLA"); - }else if(aType== BinaryAccessType.MATRIX_ROW_VECTOR){ + } + else if(aType == BinaryAccessType.MATRIX_ROW_VECTOR) { // Apply the operation to each of the column groups. // Most implementations will only modify metadata. ArrayList<ColGroup> newColGroups = new ArrayList<>(); for(ColGroup grp : _colGroups) { - if(grp instanceof ColGroupUncompressed){ + if(grp instanceof ColGroupUncompressed) { LOG.error("NOT HANDLING UNCOMPRESSED IN BINARY MV"); - }else{ + } + else { - if(grp.getNumCols() == 1){ - ScalarOperator sop = new LeftScalarOperator(op.fn, m2.getValue(0, grp.getColIndices()[0]),1); + if(grp.getNumCols() == 1) { + ScalarOperator sop = new LeftScalarOperator(op.fn, m2.getValue(0, grp.getColIndices()[0]), 1); newColGroups.add(grp.scalarOperation(sop)); - }else{ - throw new NotImplementedException("Cocoded columns (nr cols:" + grp.getNumCols() + ") groupType: not implemented for Binary Matrix Row Vector operations"); + } + else { + throw new NotImplementedException("Cocoded columns (nr cols:" + grp.getNumCols() + + ") groupType: not implemented for Binary Matrix Row Vector operations"); } } // newColGroups.add(grp.binaryMVR(m2, op)); @@ -446,11 +451,11 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { } } - protected void binaryVV(MatrixBlock m2, CompressedMatrixBlock ret, BinaryOperator op, BinaryAccessType aType ){ + protected void binaryVV(MatrixBlock m2, CompressedMatrixBlock ret, BinaryOperator op, BinaryAccessType aType) { throw new NotImplementedException("Binary Vector Vector operations are not implemented"); } - protected void binaryMM(MatrixBlock m2, CompressedMatrixBlock ret, BinaryOperator op){ + protected void binaryMM(MatrixBlock m2, CompressedMatrixBlock ret, BinaryOperator op) { throw new NotImplementedException("Binary Matrix Matrix operations are not implemented"); } @@ -584,15 +589,6 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { public MatrixBlock aggregateBinaryOperations(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret, AggregateBinaryOperator op) { - // Should not happen that it is a single uncompressed group. - // multi-threaded MM of single uncompressed ColGroup - // if(isSingleUncompressedGroup()) { - // MatrixBlock tmp = ((ColGroupUncompressed) _colGroups.get(0)).getData(); - // return tmp.aggregateBinaryOperations(this == m1 ? tmp : m1, this == m2 ? tmp : m2, ret, op); - // } - - Timing time = LOG.isDebugEnabled() ? new Timing(true) : null; - // setup meta data (dimensions, sparsity) int rl = m1.getNumRows(); int cl = m2.getNumColumns(); @@ -606,11 +602,22 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { // compute matrix mult if(m1.getNumRows() > 1 && m2.getNumColumns() == 1) { // MV right LOG.debug("Matrix Vector !"); - CompressedMatrixBlock cmb = (CompressedMatrixBlock) m1; - if(op.getNumThreads() > 1) - cmb.rightMultByVector(m2, ret, op.getNumThreads()); - else - cmb.rightMultByVector(m2, ret); + if(m1 == this) { + if(op.getNumThreads() > 1) + rightMultByVector(m2, ret, op.getNumThreads()); + else + rightMultByVector(m2, ret); + } + else if(m2 == this) { + // MatrixBlock tmpIn = new MatrixBlock(1, 1, false).allocateBlock(); + + leftMultByMatrix(_colGroups, m1, ret, op.getNumThreads(), 1); + + } + else { + throw new DMLRuntimeException( + "Error in execution of aggregate Binary Operation, where m1 or m2 is not this"); + } } else if(m1.getNumRows() == 1 && m2.getNumColumns() > 1) { // MV left LOG.debug("Vector Matrix"); @@ -650,16 +657,12 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { } } else { // MM left - LOG.debug("MM left"); - for(int i = 0; i < that.getNumRows(); i++) { - tmpIn = that.slice(i, i, 0, that.getNumColumns() - 1, tmpIn); - if(op.getNumThreads() > 1) - leftMultByVectorTranspose(_colGroups, tmpIn, tmpOut, false, op.getNumThreads()); - else - leftMultByVectorTranspose(_colGroups, tmpIn, tmpOut, false, true); - ret.leftIndexingOperations(tmpOut, i, i, 0, ret.getNumColumns() - 1, ret, UpdateType.INPLACE); - } + // if(op.getNumThreads() > 1) + // leftMultByMatrixTranspose(_colGroups, m1, ret, false, op.getNumThreads()); + // else + leftMultByMatrix(_colGroups, m1, ret, op.getNumThreads(), getNumColumns()); + } } @@ -672,6 +675,7 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { // check for supported operations if(!(op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq || + op.aggOp.increOp.fn instanceof Mean || (op.aggOp.increOp.fn instanceof Builtin && (((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN || ((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)))) { @@ -729,7 +733,7 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { // core unary aggregate if(op.getNumThreads() > 1 && getExactSizeOnDisk() > MIN_PAR_AGG_THRESHOLD) { // multi-threaded execution of all groups - ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning( + ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning(_colGroups, (op.indexFn instanceof ReduceCol) ? 1 : op.getNumThreads(), false); ColGroupUncompressed uc = getUncompressedColGroup(); @@ -800,8 +804,9 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { // special handling of mean if(op.aggOp.increOp.fn instanceof Mean) { - if(op.indexFn instanceof ReduceAll) + if(op.indexFn instanceof ReduceAll) { ret.quickSetValue(0, 0, ret.quickGetValue(0, 0) / (getNumColumns() * getNumRows())); + } else if(op.indexFn instanceof ReduceCol) { for(int i = 0; i < getNumRows(); i++) { ret.quickSetValue(i, 0, ret.quickGetValue(i, 0) / getNumColumns()); @@ -833,7 +838,7 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { int rl, int ru) { // Seems misplaced logic for when to use CacheDDC - boolean cacheDDC1 = false; + // boolean cacheDDC1 = false; // op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof KahanPlus // rowSums // && ColGroupOffset.ALLOW_CACHE_CONSCIOUS_ROWSUMS && ru - rl > CompressionSettings.BITMAP_BLOCK_SZ; @@ -856,7 +861,7 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { // throw new RuntimeException("aggregateUnaryOperation failed to materialize matrix data"); } for(ColGroup grp : groups) - if(!(grp instanceof ColGroupUncompressed) && !(cacheDDC1 && grp instanceof ColGroupDDC1)) + if(!(grp instanceof ColGroupUncompressed)) grp.unaryAggregateOperations(op, c, rl, ru); // LOG.debug(Arrays.toString(c)); } @@ -984,6 +989,7 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { for(int i = 0; i < k & i * blklen < getNumRows(); i++) tasks.add( new RightMatrixMultTask(_colGroups, vector, result, i * blklen, Math.min((i + 1) * blklen, rlen))); + List<Future<Long>> ret = pool.invokeAll(tasks); pool.shutdown(); @@ -993,7 +999,7 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { lnnz += tmp.get(); result.setNonZeros(lnnz); } - catch(InterruptedException | ExecutionException e) { + catch(Exception e) { LOG.fatal(e); throw new DMLRuntimeException(e); } @@ -1058,17 +1064,18 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { result.allocateDenseBlock(); // setup memory pool for reuse - if(allocTmp){ + if(allocTmp) { Pair<Integer, List<Integer>> v = getMaxNumValues(colGroups); ColGroupValue.setupThreadLocalMemory(v.getLeft()); - for(int i = 0; i< colGroups.size(); i++){ - colGroups.get(i).leftMultByRowVector(rowVector, result, v.getRight().get(i)); + for(int i = 0; i < colGroups.size(); i++) { + colGroups.get(i).leftMultByRowVector(rowVector.getDenseBlockValues(), + result.getDenseBlockValues(), + v.getRight().get(i)); } } - else - { + else { for(ColGroup grp : colGroups) { - grp.leftMultByRowVector(rowVector, result, -1); + grp.leftMultByRowVector(rowVector.getDenseBlockValues(), result.getDenseBlockValues(), -1); } } @@ -1121,10 +1128,10 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { // compute remaining compressed column groups in parallel ExecutorService pool = CommonThreadPool.get(Math.min(colGroups.size(), k)); - ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning(4 * k, true); - ArrayList<LeftMatrixMultTask> tasks = new ArrayList<>(); + ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning(_colGroups, 4 * k, true); + ArrayList<LeftMatrixVectorMultTask> tasks = new ArrayList<>(); for(ArrayList<ColGroup> groups : grpParts) - tasks.add(new LeftMatrixMultTask(groups, rowVector, result)); + tasks.add(new LeftMatrixVectorMultTask(groups, rowVector, result)); List<Future<Object>> ret; ret = pool.invokeAll(tasks); @@ -1143,6 +1150,145 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { result.recomputeNonZeros(); } + /** + * Multiply this matrix block by a matrix (i.e. v%*%X) + * + * @param colGroups List of column groups + * @param that Left-hand operand of the multiplication + * @param ret The result matrix to insert the results + * @param tmp buffer to hold the result; must have the appropriate size already + * @param tmpIn buffer to hold a since row of input. + * @param k The number of threads used + * @param numColumns The number of columns in this colGroup + */ + private static void leftMultByMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k, + int numColumns) { + // transpose vector if required + // MatrixBlock result = new MatrixBlock(1, getNumColumns(), false).allocateBlock(); + // if(op.getNumThreads() > 1) + // leftMultByMatrixTranspose(_colGroups, tmpIn, tmpOut, false, op.getNumThreads()); + // else + // leftMultByMatrixTranspose(_colGroups, tmpIn, tmpOut, false, true); + + // if(doTranspose) { + // rowVector = new MatrixBlock(1, vector.getNumRows(), false); + // LibMatrixReorg.transpose(vector, rowVector); + // } + + ret.reset(); + ret.allocateDenseBlock(); + // double[] retV = ret.getDenseBlockValues(); + // double[] thatV; + + // initialize and allocate the result + // that.allocateDenseBlock(); + // DenseBlock thatBlock = that.getDenseBlock(); + // thatBlock.numBlocks(); + + // int blockSize = 25;// numColumns + if(that.isInSparseFormat()) { + leftMultBySparseMatrix(colGroups, that, ret, k, numColumns); + } + else { + leftMultByDenseMatrix(colGroups, that, ret, k, numColumns); + } + ret.recomputeNonZeros(); + } + + private static void leftMultByDenseMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k, + int numColumns) { + DenseBlock db = that.getDenseBlock(); + double[] retV = ret.getDenseBlockValues(); + double[] thatV; + int blockU; + int blockL = 0; + + + for(ColGroup grp : colGroups) { + if(grp instanceof ColGroupUncompressed) + ((ColGroupUncompressed) grp).leftMultByMatrix(that, ret); + } + + // System.out.println(db.numBlocks()); + // System.out.println(that.getDenseBlock().getClass().getSimpleName()); + for(int b = 0; b <= db.numBlocks(); b++) { + int blockSize = db.blockSize(b); + blockU = Math.min(blockL + blockSize, ret.getNumRows()); + thatV = db.valuesAt(b); + // System.out.println("Length of values in block " + (thatV.length)); + + if(k == 1) { + // TODO make move singlethreaded to not materialize and getMaxNumValues multiple times. + double[][] materialized = new double[colGroups.size()][]; + // byte[][] materializedByte = new byte[colGroups.size()][]; + for(int i = 0; i < colGroups.size(); i++) { + // if(colGroups.get(i) instanceof ColGroupDDC && colGroups.get(i).isLossy()) { + // materializedByte[i] = ((ColGroupDDC) colGroups.get(i)).getByteValues(); + // } + // else { + materialized[i] = colGroups.get(i).getValues(); + // } + } + Pair<Integer, List<Integer>> v = getMaxNumValues(colGroups); + for(int j = 0; j < colGroups.size(); j++) + colGroups.get(j).leftMultByMatrix(thatV, + retV, + v.getRight().get(j), + materialized[j], + that.getNumRows(), + ret.getNumColumns(), + blockL, + blockU, + 0); + } + else { + + try { + ExecutorService pool = CommonThreadPool.get(Math.min(colGroups.size(), k)); + // compute remaining compressed column groups in parallel + ArrayList<LeftMatrixMatrixMultTask> tasks = new ArrayList<>(); + List<ColGroup>[] parts = createStaticTaskPartitioningForMatrixMult(colGroups, k, false); + int rowBlockSize = 2; + for(List<ColGroup> part : parts) { + for(int blo = blockL; blo < blockU; blo += rowBlockSize) { + // int voff = (blo - blockL) * that.getNumColumns(); + + tasks.add(new LeftMatrixMatrixMultTask(part, thatV, retV, that.getNumRows(), numColumns, + blo, Math.min(blo + rowBlockSize, blockU), blo - blockL)); + + } + // tasks.add(new LeftMatrixMatrixMultTask(part, thatV, retV, that.getNumRows(), numColumns, + // blockL, blockU, 0)); + // if(colGroups.get(j) instanceof ColGroupDDC && colGroups.get(j).isLossy()) { + // tasks.add(new LeftMatrixMatrixMultTask(colGroups.get(j), thatSlice, tmp, + // v.getRight().get(j), materializedByte[j])); + // } + // else { + } + + List<Future<Object>> futures = pool.invokeAll(tasks); + + pool.shutdown(); + for(Future<Object> future : futures) + future.get(); + } + catch(InterruptedException | ExecutionException e) { + e.printStackTrace(); + throw new DMLRuntimeException(e); + } + + } + blockL += blockSize; + } + } + + private static void leftMultBySparseMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k, + int numColumns) { + + // SparseBlock sb = that.getSparseBlock(); + throw new NotImplementedException("Sparse Block input not handled."); + } + private static void leftMultByTransposeSelf(List<ColGroup> groups, MatrixBlock result, int gl, int gu) { final int numRows = groups.get(0).getNumRows(); final int numGroups = groups.size(); @@ -1194,18 +1340,19 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { } @SuppressWarnings("unchecked") - private ArrayList<ColGroup>[] createStaticTaskPartitioning(int k, boolean inclUncompressed) { + private static ArrayList<ColGroup>[] createStaticTaskPartitioning(List<ColGroup> colGroups, int k, + boolean inclUncompressed) { // special case: single uncompressed col group - if(_colGroups.size() == 1 && _colGroups.get(0) instanceof ColGroupUncompressed) { + if(colGroups.size() == 1 && colGroups.get(0) instanceof ColGroupUncompressed) { return new ArrayList[0]; } // initialize round robin col group distribution // (static task partitioning to reduce mem requirements/final agg) - int numTasks = Math.min(k, _colGroups.size()); + int numTasks = Math.min(k, colGroups.size()); ArrayList<ColGroup>[] grpParts = new ArrayList[numTasks]; int pos = 0; - for(ColGroup grp : _colGroups) { + for(ColGroup grp : colGroups) { if(grpParts[pos] == null) grpParts[pos] = new ArrayList<>(); if(inclUncompressed || !(grp instanceof ColGroupUncompressed)) { @@ -1217,16 +1364,44 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { return grpParts; } + @SuppressWarnings("unchecked") + private static List<ColGroup>[] createStaticTaskPartitioningForMatrixMult(List<ColGroup> colGroups, int k, + boolean inclUncompressed) { + int numTasks = Math.min(k, colGroups.size()); + List<ColGroup>[] grpParts = new ArrayList[numTasks]; + int pos = 0; + for(int i = 0; i < numTasks; i++) { + grpParts[pos++] = new ArrayList<>(); + } + pos = 0; + for(ColGroup grp : colGroups) { + + if(grp instanceof ColGroupDDC) { + grpParts[pos].add((ColGroupDDC) grp); + pos = (pos == numTasks - 1) ? 0 : pos + 1; + } + } + for(ColGroup grp : colGroups) { + if(!(grp instanceof ColGroupDDC) && (inclUncompressed || !(grp instanceof ColGroupUncompressed))) { + grpParts[pos].add(grp); + pos = (pos == numTasks - 1) ? 0 : pos + 1; + } + } + + return grpParts; + } + private static Pair<Integer, List<Integer>> getMaxNumValues(List<ColGroup> groups) { int numVals = 1; List<Integer> numValues = new ArrayList<>(groups.size()); int nr; for(ColGroup grp : groups) - if(grp instanceof ColGroupValue){ + if(grp instanceof ColGroupValue) { nr = ((ColGroupValue) grp).getNumValues(); numValues.add(nr); numVals = Math.max(numVals, nr); - } else{ + } + else { numValues.add(-1); } return new ImmutablePair<>(numVals, numValues); @@ -1244,12 +1419,12 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { return null; } - private static class LeftMatrixMultTask implements Callable<Object> { + private static class LeftMatrixVectorMultTask implements Callable<Object> { private final ArrayList<ColGroup> _groups; private final MatrixBlock _vect; private final MatrixBlock _ret; - protected LeftMatrixMultTask(ArrayList<ColGroup> groups, MatrixBlock vect, MatrixBlock ret) { + protected LeftMatrixVectorMultTask(ArrayList<ColGroup> groups, MatrixBlock vect, MatrixBlock ret) { _groups = groups; _vect = vect; _ret = ret; @@ -1261,8 +1436,10 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { try { Pair<Integer, List<Integer>> v = getMaxNumValues(_groups); ColGroupValue.setupThreadLocalMemory(v.getLeft()); - for(int i = 0; i< _groups.size(); i++){ - _groups.get(i).leftMultByRowVector(_vect, _ret, v.getRight().get(i)); + for(int i = 0; i < _groups.size(); i++) { + _groups.get(i).leftMultByRowVector(_vect.getDenseBlockValues(), + _ret.getDenseBlockValues(), + v.getRight().get(i)); } ColGroupValue.cleanupThreadLocalMemory(); @@ -1274,6 +1451,60 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { } } + private static class LeftMatrixMatrixMultTask implements Callable<Object> { + private final List<ColGroup> _group; + private final double[] _that; + private final double[] _ret; + private final int _numRows; + private final int _numCols; + private final int _rl; + private final int _ru; + private final int _vOff; + + protected LeftMatrixMatrixMultTask(List<ColGroup> group, double[] that, double[] ret, int numRows, int numCols, + int rl, int ru, int vOff) { + _group = group; + _that = that; + _ret = ret; + _numRows = numRows; + _numCols = numCols; + _rl = rl; + _ru = ru; + _vOff = vOff; + } + + @Override + public Object call() { + // setup memory pool for reuse + + double[][] materialized = new double[_group.size()][]; + for(int i = 0; i < _group.size(); i++) { + materialized[i] = _group.get(i).getValues(); + } + Pair<Integer, List<Integer>> v = getMaxNumValues(_group); + try { + ColGroupValue.setupThreadLocalMemory(v.getLeft()); + for(int j = 0; j < _group.size(); j++) { + _group.get(j).leftMultByMatrix(_that, + _ret, + v.getRight().get(j), + materialized[j], + _numRows, + _numCols, + _rl, + _ru, + _vOff); + } + ColGroupValue.cleanupThreadLocalMemory(); + + } + catch(Exception e) { + throw new DMLRuntimeException(e); + } + return null; + } + } + private static class RightMatrixMultTask implements Callable<Long> { private final List<ColGroup> _groups; private final MatrixBlock _vect; @@ -1426,4 +1657,14 @@ public class CompressedMatrixBlock extends AbstractCompressedMatrixBlock { blklen % CompressionSettings.BITMAP_BLOCK_SZ : 0); } + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("\nCompressed Matrix:"); + sb.append("\nCols:" + getNumColumns() + " Rows:" + getNumRows()); + for(ColGroup cg : _colGroups) { + sb.append("\n" + cg); + } + return sb.toString(); + } } \ No newline at end of file diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java index fde7769..e7f3893 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java @@ -111,7 +111,7 @@ public class CompressedMatrixBlockFactory { LOG.debug("--compression phase 1: " + _stats.getLastTimePhase()); if(sizeInfos.colsC.isEmpty()) { - LOG.warn("Abort block compression because all columns are incompressible."); + LOG.info("Abort block compression because all columns are incompressible."); return new ImmutablePair<>(new MatrixBlock().copyShallow(mb), _stats); } // -------------------------------------------------- @@ -168,7 +168,7 @@ public class CompressedMatrixBlockFactory { _stats.ratio = _stats.originalSize / (double) _stats.size; if(_stats.ratio < 1) { - LOG.warn("Abort block compression because compression ratio is less than 1."); + LOG.info("Abort block compression because compression ratio is less than 1."); return new ImmutablePair<>(new MatrixBlock().copyShallow(mb), _stats); } @@ -179,7 +179,7 @@ public class CompressedMatrixBlockFactory { _stats.setNextTimePhase(time.stop()); _stats.setColGroupsCounts(colGroupList); - LOG.info("--num col groups: " + colGroupList.size() + ", -- num input cols: " + numCols); + LOG.debug("--num col groups: " + colGroupList.size() + ", -- num input cols: " + numCols); LOG.debug("--compression phase 5: " + _stats.getLastTimePhase()); LOG.debug("--col groups types " + _stats.getGroupsTypesString()); LOG.debug("--col groups sizes " + _stats.getGroupsSizesString()); diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java index b150a57..0f6caea 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java @@ -268,7 +268,35 @@ public abstract class ColGroup implements Serializable { * @param result matrix block result * @param numVals The Number of values contained in the Column. */ - public abstract void leftMultByRowVector(MatrixBlock vector, MatrixBlock result, int numVals); + public abstract void leftMultByRowVector(double[] vector, double[] result, int numVals); + + /** + * Multiply the slice of the matrix that this column group represents by a row vector on the left (the original + * column vector is assumed to be transposed already i.e. its size now is 1xn). + * + * @param vector row vector + * @param result matrix block result + * @param numVals The Number of values contained in the Column. + * @param values The materialized list of values contained in the dictionary. + */ + public abstract void leftMultByRowVector(double[] vector, double[] result, int numVals, double[] values); + + /** + * Multiply the slice of the matrix that this column group represents by a row vector on the left (the original + * column vector is assumed to be transposed already i.e. its size now is 1xn). + * + * @param matrix matrix to left multiply + * @param result matrix block result + * @param numVals The Number of values contained in the Column. + * @param values The materialized list of values contained in the dictionary. + * @param numRows The number of rows in the matrix input + * @param numCols The number of columns in the colGroups parent matrix. + * @param rl The row to start the matrix multiplication from + * @param ru The row to stop the matrix multiplication at. + * @param vOff The offset into the first argument matrix to start at. + */ + public abstract void leftMultByMatrix(double[] matrix, double[] result, int numVals, double[] values, int numRows, + int numCols, int rl, int ru, int vOff); /** * Perform the specified scalar operation directly on the compressed column group, without decompressing individual @@ -279,8 +307,6 @@ public abstract class ColGroup implements Serializable { */ public abstract ColGroup scalarOperation(ScalarOperator op); - // public abstract ColGroup binaryMVR(MatrixBlock m2, BinaryOperator op); - /** * Unary Aggregate operator, since aggregate operators require new object output, the output becomes an uncompressed * matrix. diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java index a3c7487..92db020 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java @@ -140,7 +140,7 @@ public abstract class ColGroupDDC extends ColGroupValue { } @Override - protected void computeRowSums(double[] c, KahanFunction kplus, int rl, int ru) { + protected void computeRowSums(double[] c, KahanFunction kplus, int rl, int ru, boolean mean) { final int numVals = getNumValues(); KahanObject kbuff = new KahanObject(0, 0); KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject(); @@ -150,7 +150,7 @@ public abstract class ColGroupDDC extends ColGroupValue { for(int rix = rl; rix < ru; rix++) { int index = getIndex(rix); if(index != numVals) { - setandExecute(c, kbuff, kplus2, vals[index], rix * 2); + setandExecute(c, kbuff, kplus2, vals[index], rix * (2 + (mean ? 1 : 0))); } } } @@ -173,36 +173,19 @@ public abstract class ColGroupDDC extends ColGroupValue { } } - protected final void postScaling(double[] vals, double[] c, int numVals) { + public void postScaling(double[] values, double[] vals, double[] c, int numVals) { + postScaling(values, vals, c, numVals, 0, 0); + } + + public void postScaling(double[] values, double[] vals, double[] c, int numVals, int i, int totalCols) { final int ncol = getNumCols(); - // final int numVals = getNumValues(); - if(_dict instanceof QDictionary) { - QDictionary d = (QDictionary) _dict; - byte[] values = d.getValuesByte(); + for(int j = 0; j < ncol; j++) { + int colIx = _colIndexes[j] + i * totalCols; for(int k = 0, valOff = 0; k < numVals; k++, valOff += ncol) { double aval = vals[k]; if(valOff != numVals) { - for(int j = 0; j < ncol; j++) { - int colIx = _colIndexes[j]; - c[colIx] += aval * values[valOff + j]; - } - } - } - for(int j = 0; j < ncol; j++) { - int colIx = _colIndexes[j]; - c[colIx] = c[colIx] * d._scale; - } - } - else { - double[] values = getValues(); - for(int k = 0, valOff = 0; k < numVals; k++, valOff += ncol) { - double aval = vals[k]; - if(valOff != numVals) { - for(int j = 0; j < ncol; j++) { - int colIx = _colIndexes[j]; - c[colIx] += aval * values[valOff + j]; - } + c[colIx] += aval * values[valOff + j]; } } } @@ -248,34 +231,101 @@ public abstract class ColGroupDDC extends ColGroupValue { } @Override - public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result, int numVals) { - double[] a = ColGroupConverter.getDenseVector(vector); - double[] c = result.getDenseBlockValues(); - numVals = (numVals == -1) ? getNumValues(): numVals; + public void leftMultByMatrix(double[] a, double[] c, int numVals, double[] values, int numRows, int numCols, int rl, + int ru, int voff) { + numVals = (numVals == -1) ? getNumValues() : numVals; + + for(int i = rl, j = voff; i < ru; i++, j++) { + if(8 * numVals < _numRows) { + // iterative over codes and pre-aggregate inputs per code (guaranteed <=255) + // temporary array also avoids false sharing in multi-threaded environments + double[] vals = preAggregate(a, numVals, j); + postScaling(values, vals, c, numVals, i, numCols); + } + else { + for(int k = 0, aOff = j *_numRows; k < _numRows; k++, aOff++) { + double aval = a[aOff]; + if(aval != 0) { + int valOff = getIndex(k) * _colIndexes.length; + if(valOff != numVals) { + for(int h = 0; h < _colIndexes.length; h++) { + int colIx = _colIndexes[h] + i * numCols; + c[colIx] += aval * values[valOff + h]; + } + } + } + } + } + } + } - if(8 * numVals < _numRows) { - // iterative over codes and pre-aggregate inputs per code (guaranteed <=255) - // temporary array also avoids false sharing in multi-threaded environments - double[] vals = allocDVector(numVals, true); + @Override + public void leftMultByRowVector(double[] a, double[] result, int numVals) { + numVals = (numVals == -1) ? getNumValues() : numVals; + double[] values = getValues(); + + leftMultByRowVector(a, result, numVals, values); + + } + + public double[] preAggregate(double[] a, int numVals) { + return preAggregate(a, numVals, 0); + } + + /** + * Pre aggregates a specific row from the input a which can be a row or a matrix. + * + * @param a the input vector or matrix to multiply with + * @param numVals the number of values contained in the dictionary + * @param aRows the row index from a + * @return the pre-aggregated values. + */ + public double[] preAggregate(double[] a, int numVals, int aRows) { + double[] vals; + if(aRows > 0) { + vals = allocDVector(numVals, true); + // int off = _numRows * aRows; + for(int i = 0, off = _numRows * aRows; i < _numRows; i++, off++) { + int index = getIndex(i); + if(index != numVals) { // Since we know that multiplying with 0 is .. 0 don't begin to aggregate. + vals[index] += a[off]; + } + } + } + else { + vals = allocDVector(numVals, true); for(int i = 0; i < _numRows; i++) { int index = getIndex(i); if(index != numVals) { // Since we know that multiplying with 0 is .. 0 don't begin to aggregate. vals[index] += a[i]; } } - postScaling(vals, c, numVals); + } + return vals; + } + + @Override + public void leftMultByRowVector(double[] a, double[] c, int numVals, double[] values) { + // double[] c = result.getDenseBlockValues(); + numVals = (numVals == -1) ? getNumValues() : numVals; + + if(8 * numVals < _numRows) { + // iterative over codes and pre-aggregate inputs per code (guaranteed <=255) + // temporary array also avoids false sharing in multi-threaded environments + double[] vals = preAggregate(a, numVals); + postScaling(values, vals, c, numVals); } else { // iterate over codes, compute all, and add to the result - double[] values = getValues(); for(int i = 0; i < _numRows; i++) { double aval = a[i]; if(aval != 0) for(int j = 0, valOff = getIndex(i) * _colIndexes.length; j < _colIndexes.length; j++) - c[_colIndexes[j]] += aval * values[valOff + j]; + if(valOff != numVals) { + c[_colIndexes[j]] += aval * values[valOff + j]; + } } } - } @Override diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java index e7bf0d2..44c0c47 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java @@ -320,10 +320,12 @@ public class ColGroupOLE extends ColGroupOffset { int[] apos = skipScan(numVals, rl); double[] aval = preaggValues(numVals, sb); + // LOG.error(Arrays.toString(apos)); + // LOG.error(rl + " - " + ru); // step 2: cache conscious matrix-vector via horizontal scans - for(int bi = rl; bi < ru; bi += blksz2) { + for(int bi = rl; bi < ru; bi += blksz) { int bimax = Math.min(bi + blksz2, ru); - + // horizontal segment scan, incl pos maintenance for(int k = 0; k < numVals; k++) { int boff = _ptr[k]; @@ -331,13 +333,13 @@ public class ColGroupOLE extends ColGroupOffset { double val = aval[k]; int bix = apos[k]; + int len = _data[boff + bix]; + int pos = boff + bix + 1; + // LOG.error("Len: "+pos + " pos: "+bi + " ii " + len); for(int ii = bi; ii < bimax && bix < blen; ii += blksz) { // prepare length, start, and end pos - int len = _data[boff + bix]; - int pos = boff + bix + 1; - // compute partial results - LinearAlgebraUtils.vectAdd(val, c, _data, pos, ii, len); + LinearAlgebraUtils.vectAdd(val, c, _data, pos, ii, Math.min(len, ru)); bix += len + 1; } @@ -379,13 +381,16 @@ public class ColGroupOLE extends ColGroupOffset { } @Override - public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result, int numVals) { - double[] a = ColGroupConverter.getDenseVector(vector); - double[] c = result.getDenseBlockValues(); + public void leftMultByRowVector(double[] a, double[] c, int numVals) { + numVals = (numVals == -1) ? getNumValues() : numVals; + final double[] values = getValues(); + leftMultByRowVector(a, c, numVals, values); + } + + @Override + public void leftMultByRowVector(double[] a, double[] c, int numVals, double[] values) { final int blksz = CompressionSettings.BITMAP_BLOCK_SZ; final int numCols = getNumCols(); - // final int numVals = getNumValues(); - final double[] values = getValues(); if(numVals >= 1 && _numRows > blksz) { // cache blocking config (see matrix-vector mult for explanation) @@ -447,12 +452,92 @@ public class ColGroupOLE extends ColGroupOffset { } @Override + public void leftMultByMatrix(double[] a, double[] c, int numVals, double[] values, int numRows, int numCols, int rl, + int ru, int voff) { + final int blksz = CompressionSettings.BITMAP_BLOCK_SZ; + final int thisNumCols = getNumCols(); + + if(numVals >= 1 && _numRows > blksz) { + + // cache blocking config (see matrix-vector mult for explanation) + final int blksz2 = 2 * CompressionSettings.BITMAP_BLOCK_SZ; + + // step 1: prepare position and value arrays + + // current pos per OLs / output values + int[] apos = allocIVector(numVals, true); + double[] cvals = allocDVector(numVals, true); + + for(int i = rl, off = voff* _numRows; i < ru; i++, off += _numRows) { + // step 2: cache conscious matrix-vector via horizontal scans + for(int ai = 0; ai < _numRows; ai += blksz2) { + int aimax = Math.min(ai + blksz2, _numRows); + + // horizontal segment scan, incl pos maintenance + for(int k = 0; k < numVals; k++) { + int boff = _ptr[k]; + int blen = len(k); + int bix = apos[k] + off; + double vsum = 0; + + for(int ii = ai; ii < aimax && bix < blen; ii += blksz) { + // prepare length, start, and end pos + int len = _data[boff + bix]; + int pos = boff + bix + 1; + + // iterate over bitmap blocks and compute partial results (a[i]*1) + vsum += LinearAlgebraUtils.vectSum(a, _data, ii + off, pos, len); + bix += len + 1; + } + + apos[k] = bix; + cvals[k] += vsum; + } + } + + // step 3: scale partial results by values and write to global output + for(int k = 0, valOff = 0; k < numVals; k++, valOff += thisNumCols) + for(int j = 0; j < thisNumCols; j++) { + int colIx = _colIndexes[j] + i * numCols; + c[colIx] += cvals[k] * values[valOff + j]; + } + } + } + else { + + for(int i = rl, offR = voff* _numRows; i < ru; i++, offR += _numRows) { + for(int k = 0, valOff = 0; k < numVals; k++, valOff += thisNumCols) { + int boff = _ptr[k]; + int blen = len(k); + + // iterate over bitmap blocks and add partial results + double vsum = 0; + for(int bix = 0, off = 0; bix < blen; bix += _data[boff + bix] + 1, off += blksz) + vsum += LinearAlgebraUtils.vectSum(a, _data, off + offR, boff + bix + 1, _data[boff + bix]); + + // scale partial results by values and write results + + for(int j = 0; j < thisNumCols; j++) { + int colIx = _colIndexes[j] + i * numCols; + c[colIx] += vsum * values[valOff + j]; + } + } + } + } + } + + // @Override + // public void leftMultByRowVector(double[] a, double[] c, int numVals, byte[] values) { + // throw new NotImplementedException("Not Implemented Byte fore OLE"); + // } + + @Override protected final void computeSum(double[] c, KahanFunction kplus) { c[0] += _dict.sum(getCounts(), _colIndexes.length, kplus); } @Override - protected final void computeRowSums(double[] c, KahanFunction kplus, int rl, int ru) { + protected final void computeRowSums(double[] c, KahanFunction kplus, int rl, int ru, boolean mean) { KahanObject kbuff = new KahanObject(0, 0); KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject(); final int blksz = CompressionSettings.BITMAP_BLOCK_SZ; @@ -484,7 +569,7 @@ public class ColGroupOLE extends ColGroupOffset { // compute partial results for(int i = 0; i < len; i++) { int rix = ii + _data[pos + i]; - setandExecute(c, kbuff, kplus2, val, rix * 2); + setandExecute(c, kbuff, kplus2, val, rix * (2 + (mean ? 1 : 0))); } bix += len + 1; } @@ -509,7 +594,7 @@ public class ColGroupOLE extends ColGroupOffset { slen = _data[boff + bix]; for(int i = 1; i <= slen; i++) { int rix = off + _data[boff + bix + i]; - setandExecute(c, kbuff, kplus2, val, rix * 2); + setandExecute(c, kbuff, kplus2, val, rix * (2 + (mean ? 1 : 0))); } } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java index 912910a..1d6f1d1 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java @@ -293,18 +293,6 @@ public abstract class ColGroupOffset extends ColGroupValue { protected abstract boolean[] computeZeroIndicatorVector(); - // protected boolean findZeros(){ - // boolean[] lind = computeZeroIndicatorVector(); - // _zeros = false; - // for(boolean x : lind){ - // if(x){ - // _zeros = true; - // break; - // } - // } - // return _zeros; - // } - @Override public Iterator<IJV> getIterator(int rl, int ru, boolean inclZeros, boolean rowMajor) { if(rowMajor) diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java index b27ec8e..65658c7 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java @@ -358,12 +358,15 @@ public class ColGroupRLE extends ColGroupOffset { } @Override - public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result, int numVals) { - double[] a = ColGroupConverter.getDenseVector(vector); - double[] c = result.getDenseBlockValues(); - final int numCols = getNumCols(); - // final int numVals = getNumValues(); + public void leftMultByRowVector(double[] a, double[] c, int numVals) { + numVals = (numVals == -1) ? getNumValues() : numVals; final double[] values = getValues(); + leftMultByRowVector(a, c, numVals, values); + } + + @Override + public void leftMultByRowVector(double[] a, double[] c, int numVals, double[] values) { + final int numCols = getNumCols(); if(numVals >= 1 && _numRows > CompressionSettings.BITMAP_BLOCK_SZ) { final int blksz = 2 * CompressionSettings.BITMAP_BLOCK_SZ; @@ -429,6 +432,84 @@ public class ColGroupRLE extends ColGroupOffset { } @Override + public void leftMultByMatrix(double[] a, double[] c, int numVals, double[] values, int numRows, int numCols, int rl, + int ru, int voff) { + // throw new NotImplementedException(); + final int thisNumCols = getNumCols(); + + if(numVals >= 1 && _numRows > CompressionSettings.BITMAP_BLOCK_SZ) { + final int blksz = 2 * CompressionSettings.BITMAP_BLOCK_SZ; + + // double[] aRow = new double[a.length / numRows]; + // step 1: prepare position and value arrays + int[] astart = new int[numVals]; + for(int i = rl, off = voff * _numRows; i < ru; i++, off += _numRows) { + // System.arraycopy(a, (a.length / numRows) * i, aRow, 0, a.length / numRows); + // current pos per OLs / output values + int[] apos = allocIVector(numVals, true); + double[] cvals = allocDVector(numVals, true); + + // step 2: cache conscious matrix-vector via horizontal scans + for(int ai = 0; ai < _numRows; ai += blksz) { + int aimax = Math.min(ai + blksz, _numRows); + + // horizontal scan, incl pos maintenance + for(int k = 0; k < numVals; k++) { + int boff = _ptr[k]; + int blen = len(k); + int bix = apos[k]; + int start = astart[k]; + + // compute partial results, not aligned + while(bix < blen & start + off < aimax) { + start += _data[boff + bix]; + int len = _data[boff + bix + 1]; + cvals[k] += LinearAlgebraUtils.vectSum(a, start + off, len); + start += len; + bix += 2; + } + + apos[k] = bix; + astart[k] = start; + } + } + + // step 3: scale partial results by values and write to global output + for(int k = 0, valOff = 0; k < numVals; k++, valOff += thisNumCols) + for(int j = 0; j < thisNumCols; j++){ + + int colIx = _colIndexes[j] + i * numCols; + c[colIx] += cvals[k] * values[valOff + j]; + } + } + } + else { + // iterate over all values and their bitmaps + for(int i = rl, off = voff * _numRows; i < ru; i++, off += _numRows) { + for(int k = 0, valOff = 0; k < numVals; k++, valOff += thisNumCols) { + int boff = _ptr[k]; + int blen = len(k); + + double vsum = 0; + int curRunEnd = 0; + for(int bix = 0; bix < blen; bix += 2) { + int curRunStartOff = curRunEnd + _data[boff + bix]; + int curRunLen = _data[boff + bix + 1]; + vsum += LinearAlgebraUtils.vectSum(a, curRunStartOff + off, curRunLen); + curRunEnd = curRunStartOff + curRunLen; + } + + for(int j = 0; j < thisNumCols; j++) { + int colIx = _colIndexes[j] + i * numCols; + // scale partial results by values and write results + c[colIx] += vsum * values[valOff + j]; + } + } + } + } + } + + @Override public ColGroup scalarOperation(ScalarOperator op) { double val0 = op.executeScalar(0); @@ -462,7 +543,7 @@ public class ColGroupRLE extends ColGroupOffset { } @Override - protected final void computeRowSums(double[] c, KahanFunction kplus, int rl, int ru) { + protected final void computeRowSums(double[] c, KahanFunction kplus, int rl, int ru, boolean mean) { KahanObject kbuff = new KahanObject(0, 0); KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject(); @@ -497,7 +578,7 @@ public class ColGroupRLE extends ColGroupOffset { int from = Math.max(bi, start + lstart); int to = Math.min(start + lstart + llen, bimax); for(int rix = from; rix < to; rix++) { - setandExecute(c, kbuff, kplus2, val, rix); + setandExecute(c, kbuff, kplus2, val, rix * (2 + (mean ? 1 : 0))); } if(start + lstart + llen >= bimax) break; @@ -525,7 +606,7 @@ public class ColGroupRLE extends ColGroupOffset { curRunStartOff = curRunEnd + _data[boff + bix]; curRunEnd = curRunStartOff + _data[boff + bix + 1]; for(int rix = curRunStartOff; rix < curRunEnd && rix < ru; rix++) { - setandExecute(c, kbuff, kplus2, val, rix * 2); + setandExecute(c, kbuff, kplus2, val, rix * (2 + (mean ? 1 : 0))); } } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java index 709975f..e18fb14 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java @@ -51,16 +51,12 @@ public class ColGroupSizes { public static long estimateInMemorySizeDDC(int nrCols, int uniqueVals, boolean lossy) { long size = estimateInMemorySizeGroupValue(nrCols, uniqueVals, lossy); - // if(!zeros){ - // size += -nrCols * 8; - // } return size; } public static long estimateInMemorySizeDDC1(int nrCols, int uniqueVals, int dataLength, boolean lossy) { if(uniqueVals > 255) return Long.MAX_VALUE; - // LOG.debug("DD1C: " + nrCols + " nr unique: " + uniqueVals + " DataLength: " + dataLength); long size = estimateInMemorySizeDDC(nrCols, uniqueVals, lossy); size += MemoryEstimates.byteArrayCost(dataLength); return size; @@ -69,14 +65,12 @@ public class ColGroupSizes { public static long estimateInMemorySizeDDC2(int nrCols, int uniqueVals, int dataLength, boolean lossy) { if(uniqueVals > Character.MAX_VALUE) return Long.MAX_VALUE; - // LOG.debug("DD2C: " + nrCols + "nr unique: " + uniqueVals +" datalen: "+ dataLength); long size = estimateInMemorySizeDDC(nrCols, uniqueVals, lossy); size += MemoryEstimates.charArrayCost(dataLength); return size; } public static long estimateInMemorySizeOffset(int nrColumns, int nrValues, int pointers, int offsetLength, boolean lossy) { - // LOG.debug("OFFSET list: nrC " + nrColumns +"\tnrV " + nrValues + "\tpl "+pointers +"\tdl "+ offsetLength); long size = estimateInMemorySizeGroupValue(nrColumns, nrValues, lossy); size += MemoryEstimates.intArrayCost(pointers); size += MemoryEstimates.charArrayCost(offsetLength); @@ -87,8 +81,6 @@ public class ColGroupSizes { nrColumns = nrColumns > 0 ? nrColumns : 1; offsetLength += (nrRows / CompressionSettings.BITMAP_BLOCK_SZ) * 2; long size = 0; - // LOG.debug("OLE cols: " + nrColumns + " vals: " + nrValues + " pointers: " + (nrValues / nrColumns + 1) - // + " offsetLength: " + (offsetLength) + " runs: " + nrValues / nrColumns); size = estimateInMemorySizeOffset(nrColumns, nrValues, (nrValues / nrColumns) + 1, offsetLength, lossy); if (nrRows > CompressionSettings.BITMAP_BLOCK_SZ * 2){ size += MemoryEstimates.intArrayCost((int) nrValues / nrColumns); @@ -99,7 +91,6 @@ public class ColGroupSizes { public static long estimateInMemorySizeRLE(int nrColumns, int nrValues, int nrRuns, int nrRows, boolean lossy) { nrColumns = nrColumns > 0 ? nrColumns : 1; int offsetLength = (nrRuns) * 2; - // LOG.debug("\n\tRLE cols: " + nrColumns + " vals: " + nrValues + " offsetLength: " + offsetLength); long size = estimateInMemorySizeOffset(nrColumns, nrValues, (nrValues / nrColumns) + 1, offsetLength, lossy); return size; diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java index 74660a1..4f81210 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java @@ -274,9 +274,23 @@ public class ColGroupUncompressed extends ColGroup { } @Override - public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result, int numVals) { - MatrixBlock pret = new MatrixBlock(1, _colIndexes.length, false); - LibMatrixMult.matrixMult(vector, _data, pret); + public void leftMultByRowVector(double[] vector, double[] c, int numVals) { + throw new NotImplementedException("Should not be called use other matrix function"); + } + + @Override + public void leftMultByRowVector(double[] vector, double[] c, int numVals, double[] values) { + throw new NotImplementedException("Should not be called use other matrix function"); + } + + @Override + public void leftMultByMatrix(double[] vector, double[] c, int numVals, double[] values, int numRows, int numCols, int rl, int ru, int vOff) { + throw new NotImplementedException("Should not be called use other matrix function"); + } + + public void leftMultByMatrix(MatrixBlock matrix, MatrixBlock result) { + MatrixBlock pret = new MatrixBlock(matrix.getNumRows(), _colIndexes.length, false); + LibMatrixMult.matrixMult(matrix, _data, pret); // copying partialResult to the proper indices of the result if(!pret.isEmptyBlock(false)) { @@ -287,24 +301,6 @@ public class ColGroupUncompressed extends ColGroup { } } - // @Override - // public void leftMultByRowVector(ColGroupDDC vector, MatrixBlock result) { - // throw new NotImplementedException(); - // } - - // public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result) { - // MatrixBlock pret = new MatrixBlock(1, _colIndexes.length, false); - // LibMatrixMult.matrixMult(vector, _data, pret, k); - - // // copying partialResult to the proper indices of the result - // if(!pret.isEmptyBlock(false)) { - // double[] rsltArr = result.getDenseBlockValues(); - // for(int colIx = 0; colIx < _colIndexes.length; colIx++) - // rsltArr[_colIndexes[colIx]] = pret.quickGetValue(0, colIx); - // result.recomputeNonZeros(); - // } - // } - @Override public ColGroup scalarOperation(ScalarOperator op) { // execute scalar operations diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java index 3b184a2..d58b243 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java @@ -34,6 +34,7 @@ import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode; import org.apache.sysds.runtime.functionobjects.KahanFunction; import org.apache.sysds.runtime.functionobjects.KahanPlus; import org.apache.sysds.runtime.functionobjects.KahanPlusSq; +import org.apache.sysds.runtime.functionobjects.Mean; import org.apache.sysds.runtime.functionobjects.ReduceAll; import org.apache.sysds.runtime.functionobjects.ReduceCol; import org.apache.sysds.runtime.functionobjects.ReduceRow; @@ -113,6 +114,10 @@ public abstract class ColGroupValue extends ColGroup { return _dict.getValues(); } + public byte[] getByteValues() { + return ((QDictionary)_dict).getValuesByte(); + } + @Override public MatrixBlock getValuesAsBlock() { final double[] values = getValues(); @@ -263,14 +268,15 @@ public abstract class ColGroupValue extends ColGroup { @Override public void unaryAggregateOperations(AggregateUnaryOperator op, double[] c, int rl, int ru) { // sum and sumsq (reduceall/reducerow over tuples and counts) - if(op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq) { - KahanFunction kplus = (op.aggOp.increOp.fn instanceof KahanPlus) ? KahanPlus + if(op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq || op.aggOp.increOp.fn instanceof Mean) { + KahanFunction kplus = (op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof Mean) ? KahanPlus .getKahanPlusFnObject() : KahanPlusSq.getKahanPlusSqFnObject(); + boolean mean = op.aggOp.increOp.fn instanceof Mean; if(op.indexFn instanceof ReduceAll) computeSum(c, kplus); else if(op.indexFn instanceof ReduceCol) - computeRowSums(c, kplus, rl, ru); + computeRowSums(c, kplus, rl, ru, mean); else if(op.indexFn instanceof ReduceRow) computeColSums(c, kplus); } @@ -406,7 +412,7 @@ public abstract class ColGroupValue extends ColGroup { protected abstract void computeSum(double[] c, KahanFunction kplus); - protected abstract void computeRowSums(double[] c, KahanFunction kplus, int rl, int ru); + protected abstract void computeRowSums(double[] c, KahanFunction kplus, int rl, int ru, boolean mean); protected abstract void computeColSums(double[] c, KahanFunction kplus); diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/LinearAlgebraUtils.java b/src/main/java/org/apache/sysds/runtime/compress/utils/LinearAlgebraUtils.java index b24d6f1..4093b25 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/utils/LinearAlgebraUtils.java +++ b/src/main/java/org/apache/sysds/runtime/compress/utils/LinearAlgebraUtils.java @@ -51,6 +51,7 @@ public class LinearAlgebraUtils { LibMatrixMult.vectAdd(a, c, ai, ci, len); } + public static void vectAdd(final double aval, double[] c, char[] bix, final int bi, final int ci, final int len) { final int bn = len % 8; @@ -69,6 +70,7 @@ public class LinearAlgebraUtils { c[ci + bix[j + 6]] += aval; c[ci + bix[j + 7]] += aval; } + } public static void vectAdd(final double aval, double[] c, final int ci, final int len) { diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java index ed52481..0eb484a 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java @@ -207,6 +207,13 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab } } + public MatrixBlock(int rl, int cl, DenseBlock dBlock){ + rlen = rl; + clen = cl; + sparse = false; + denseBlock = dBlock; + } + //////// // Initialization methods // (reset, init, allocate, etc) diff --git a/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java b/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java index 560d234..dd94aa2 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java +++ b/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java @@ -43,7 +43,8 @@ public abstract class AbstractCompressedUnaryTests extends CompressedTestBase { } enum AggType { - ROWSUMS, COLSUMS, SUM, ROWSUMSSQ, COLSUMSSQ, SUMSQ, ROWMAXS, COLMAXS, MAX, ROWMINS, COLMINS, MIN, MEAN + ROWSUMS, COLSUMS, SUM, ROWSUMSSQ, COLSUMSSQ, SUMSQ, ROWMAXS, COLMAXS, MAX, ROWMINS, COLMINS, MIN, MEAN, COLMEAN, + ROWMEAN } @Test @@ -106,14 +107,21 @@ public abstract class AbstractCompressedUnaryTests extends CompressedTestBase { testUnaryOperators(AggType.MIN); } - @Test(expected = NotImplementedException.class) + @Test public void testUnaryOperator_MEAN() { - // if Input was not compressed then just pass test - if(!(cmb instanceof CompressedMatrixBlock)) - throw new NotImplementedException("Test Passed"); testUnaryOperators(AggType.MEAN); } + @Test + public void testUnaryOperator_COLMEAN() { + testUnaryOperators(AggType.COLMEAN); + } + + @Test + public void testUnaryOperator_ROWMEAN() { + testUnaryOperators(AggType.ROWMEAN); + } + protected AggregateUnaryOperator getUnaryOperator(AggType aggType, int threads) { switch(aggType) { case SUM: @@ -142,6 +150,10 @@ public abstract class AbstractCompressedUnaryTests extends CompressedTestBase { return InstructionUtils.parseBasicAggregateUnaryOperator("uacmin", threads); case MEAN: return InstructionUtils.parseBasicAggregateUnaryOperator("uamean", threads); + case ROWMEAN: + return InstructionUtils.parseBasicAggregateUnaryOperator("uarmean", threads); + case COLMEAN: + return InstructionUtils.parseBasicAggregateUnaryOperator("uacmean", threads); default: throw new NotImplementedException("Not Supported Aggregate Unary operator in test"); } @@ -162,9 +174,9 @@ public abstract class AbstractCompressedUnaryTests extends CompressedTestBase { double[][] d1 = DataConverter.convertToDoubleMatrix(ret1); double[][] d2 = DataConverter.convertToDoubleMatrix(ret2); int dim1 = (aggType == AggType.ROWSUMS || aggType == AggType.ROWSUMSSQ || aggType == AggType.ROWMAXS || - aggType == AggType.ROWMINS) ? rows : 1; + aggType == AggType.ROWMINS || aggType == AggType.ROWMEAN) ? rows : 1; int dim2 = (aggType == AggType.COLSUMS || aggType == AggType.COLSUMSSQ || aggType == AggType.COLMAXS || - aggType == AggType.COLMINS) ? cols : 1; + aggType == AggType.COLMINS || aggType == AggType.COLMEAN) ? cols : 1; assertTrue("dim 1 is equal in non compressed res", d1.length == dim1); assertTrue("dim 1 is equal in compressed res", d2.length == dim1); @@ -181,15 +193,25 @@ public abstract class AbstractCompressedUnaryTests extends CompressedTestBase { } else if(aggType == AggType.SUM) { TestUtils.compareMatrices(d1, d2, lossyTolerance * 10 * cols * rows, css); - + } + else if(aggType == AggType.MEAN) { + TestUtils.compareMatrices(d1, d2, lossyTolerance * cols * rows, css); + } + else if(aggType == AggType.ROWMEAN) { + TestUtils.compareMatrices(d1, d2, lossyTolerance, css); } else { boolean ignoreZero = true; - TestUtils.compareMatricesPercentageDistance(d1, d2, 0.1, 0.9, css, ignoreZero); + TestUtils.compareMatricesPercentageDistance(d1, d2, 0.8, 0.9, css, ignoreZero); } } else { - TestUtils.compareMatricesBitAvgDistance(d1, d2, 2048, 30, css); + if(aggType == AggType.ROWMEAN) { + TestUtils.compareMatrices(d1, d2, 0.0001, css); + } + else { + TestUtils.compareMatricesBitAvgDistance(d1, d2, 2048, 30, css); + } } } catch(NotImplementedException e) { diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java index 2440c2d..b9276da 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java +++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java @@ -19,7 +19,7 @@ package org.apache.sysds.test.component.compress; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.Collection; @@ -57,7 +57,7 @@ public abstract class CompressedTestBase extends TestBase { protected static final Log LOG = LogFactory.getLog(CompressedTestBase.class.getName()); protected static SparsityType[] usedSparsityTypes = new SparsityType[] { // Sparsity 0.9, 0.1, 0.01 and 0.0 - SparsityType.FULL, + // SparsityType.FULL, // SparsityType.DENSE, SparsityType.SPARSE, // SparsityType.ULTRA_SPARSE, @@ -67,14 +67,13 @@ public abstract class CompressedTestBase extends TestBase { protected static ValueType[] usedValueTypes = new ValueType[] { // ValueType.RAND, // ValueType.CONST, - ValueType.RAND_ROUND, + // ValueType.RAND_ROUND, ValueType.OLE_COMPRESSIBLE, // ValueType.RLE_COMPRESSIBLE, }; - protected static ValueRange[] usedValueRanges = new ValueRange[] { - ValueRange.SMALL, - // ValueRange.LARGE, + protected static ValueRange[] usedValueRanges = new ValueRange[] {ValueRange.SMALL, + ValueRange.LARGE, // ValueRange.BYTE }; @@ -84,30 +83,30 @@ public abstract class CompressedTestBase extends TestBase { // CLA TESTS! new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed) - .setValidCompressions(EnumSet.of(CompressionType.DDC)).setInvestigateEstimate(true).create(), + .setValidCompressions(EnumSet.of(CompressionType.DDC)).setInvestigateEstimate(true).create(), new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed) - .setValidCompressions(EnumSet.of(CompressionType.OLE)).setInvestigateEstimate(true).create(), + .setValidCompressions(EnumSet.of(CompressionType.OLE)).setInvestigateEstimate(true).create(), new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed) - .setValidCompressions(EnumSet.of(CompressionType.RLE)).setInvestigateEstimate(true).create(), + .setValidCompressions(EnumSet.of(CompressionType.RLE)).setInvestigateEstimate(true).create(), new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed).setInvestigateEstimate(true) - .create(), + .create(), new CompressionSettingsBuilder().setSamplingRatio(1.0).setSeed(compressionSeed).setInvestigateEstimate(true) - .setAllowSharedDictionary(false).setmaxStaticColGroupCoCode(1).create(), + .setAllowSharedDictionary(false).setmaxStaticColGroupCoCode(1).create(), - // LOSSY TESTS! + // // LOSSY TESTS! new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed) - .setValidCompressions(EnumSet.of(CompressionType.DDC)).setInvestigateEstimate(true).setLossy(true).create(), + .setValidCompressions(EnumSet.of(CompressionType.DDC)).setInvestigateEstimate(true).setLossy(true).create(), new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed) - .setValidCompressions(EnumSet.of(CompressionType.OLE)).setInvestigateEstimate(true).setLossy(true).create(), + .setValidCompressions(EnumSet.of(CompressionType.OLE)).setInvestigateEstimate(true).setLossy(true).create(), new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed) - .setValidCompressions(EnumSet.of(CompressionType.RLE)).setInvestigateEstimate(true).setLossy(true).create(), + .setValidCompressions(EnumSet.of(CompressionType.RLE)).setInvestigateEstimate(true).setLossy(true).create(), new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed).setInvestigateEstimate(true) .create(), - // new CompressionSettingsBuilder().setSamplingRatio(1.0).setSeed(compressionSeed).setInvestigateEstimate(true) - // .setAllowSharedDictionary(false).setmaxStaticColGroupCoCode(1).setLossy(true).create(), + new CompressionSettingsBuilder().setSamplingRatio(1.0).setSeed(compressionSeed).setInvestigateEstimate(true) + .setAllowSharedDictionary(false).setmaxStaticColGroupCoCode(1).setLossy(true).create(), // COCODING TESTS!! @@ -122,12 +121,12 @@ public abstract class CompressedTestBase extends TestBase { protected static MatrixTypology[] usedMatrixTypology = new MatrixTypology[] { // Selected Matrix Types MatrixTypology.SMALL, - // MatrixTypology.FEW_COL, + MatrixTypology.FEW_COL, // MatrixTypology.FEW_ROW, // MatrixTypology.LARGE, MatrixTypology.SINGLE_COL, // MatrixTypology.SINGLE_ROW, - MatrixTypology.L_ROWS, + // MatrixTypology.L_ROWS, // MatrixTypology.XL_ROWS, }; @@ -173,16 +172,17 @@ public abstract class CompressedTestBase extends TestBase { } catch(Exception e) { e.printStackTrace(); - assertTrue("\nCompressionTest Init failed with settings: " + this.toString(), false); + fail("\nCompressionTest Init failed with settings: " + this.toString()); } } - + /** + * Tolerance for encoding values is the maximum value in dataset divided by number distinct values available in + * a single Byte (since we encode our quntization in Byte) + * + * @param valueRange The value range used as input + */ private void setLossyTolerance(ValueRange valueRange) { - /** - * Tolerance for encoding values is the maximum value in dataset divided by number distinct values available in - * a single Byte (since we encode our quntization in Byte) - */ lossyTolerance = (double) (Math.max(TestConstants.getMaxRangeValue(valueRange), Math.abs(TestConstants.getMinRangeValue(valueRange)))) * (1.0 / 127.0) / 2.0; // LOG.debug("TOLERANCE IN TEST:" + lossyTolerance); @@ -394,6 +394,86 @@ public abstract class CompressedTestBase extends TestBase { } @Test + public void testLeftMatrixMatrixMultSmall() { + try { + if(!(cmb instanceof CompressedMatrixBlock)) + return; // Input was not compressed then just pass test + + MatrixBlock matrix = DataConverter + .convertToMatrixBlock(TestUtils.generateTestMatrix(2, rows, 0.9, 1.5, 1.0, 3)); + + // Make Operator + AggregateBinaryOperator abop = InstructionUtils.getMatMultOperator(_k); + + // vector-matrix uncompressed + MatrixBlock ret1 = mb.aggregateBinaryOperations(matrix, mb, new MatrixBlock(), abop); + + // vector-matrix compressed + MatrixBlock ret2 = cmb.aggregateBinaryOperations(matrix, cmb, new MatrixBlock(), abop); + + // compare result with input + double[][] d1 = DataConverter.convertToDoubleMatrix(ret1); + double[][] d2 = DataConverter.convertToDoubleMatrix(ret2); + if(compressionSettings.lossy) { + TestUtils.compareMatricesPercentageDistance(d1, d2, 0.25, 0.96, compressionSettings.toString()); + } + else { + // rows + if(rows > 65000) { + TestUtils.compareMatricesPercentageDistance(d1, d2, 0.01, 0.99, compressionSettings.toString()); + } + else { + + TestUtils.compareMatricesBitAvgDistance(d1, d2, 10000, 500, compressionSettings.toString()); + } + } + } + catch(Exception e) { + e.printStackTrace(); + throw new RuntimeException(this.toString() + "\n" + e.getMessage(), e); + } + } + + @Test + public void testLeftMatrixMatrixMultMedium() { + try { + if(!(cmb instanceof CompressedMatrixBlock)) + return; // Input was not compressed then just pass test + + MatrixBlock matrix = DataConverter + .convertToMatrixBlock(TestUtils.generateTestMatrix(402, rows, 0.9, 1.5, 1.0, 3)); + + // Make Operator + AggregateBinaryOperator abop = InstructionUtils.getMatMultOperator(_k); + + // vector-matrix uncompressed + MatrixBlock ret1 = mb.aggregateBinaryOperations(matrix, mb, new MatrixBlock(), abop); + + // vector-matrix compressed + MatrixBlock ret2 = cmb.aggregateBinaryOperations(matrix, cmb, new MatrixBlock(), abop); + + // compare result with input + double[][] d1 = DataConverter.convertToDoubleMatrix(ret1); + double[][] d2 = DataConverter.convertToDoubleMatrix(ret2); + if(compressionSettings.lossy) { + TestUtils.compareMatricesPercentageDistance(d1, d2, 0.25, 0.96, compressionSettings.toString()); + } + else { + if(rows > 65000) { + TestUtils.compareMatricesPercentageDistance(d1, d2, 0.01, 0.99, compressionSettings.toString()); + } + else { + TestUtils.compareMatricesBitAvgDistance(d1, d2, 10000, 500, compressionSettings.toString()); + } + } + } + catch(Exception e) { + e.printStackTrace(); + throw new RuntimeException(this.toString() + "\n" + e.getMessage(), e); + } + } + + @Test public void testTransposeSelfMatrixMult() { try { if(!(cmb instanceof CompressedMatrixBlock)) @@ -413,9 +493,7 @@ public abstract class CompressedTestBase extends TestBase { double[][] d2 = DataConverter.convertToDoubleMatrix(ret2); // High probability that The value is off by some amount if(compressionSettings.lossy) { - /** - * Probably the worst thing you can do to increase the amount the values are estimated wrong - */ + //Probably the worst thing you can do to increase the amount the values are estimated wrong TestUtils.compareMatricesPercentageDistance(d1, d2, 0.0, 0.8, compressionSettings.toString()); } else { diff --git a/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java b/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java index a74c138..47895e8 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java +++ b/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java @@ -24,7 +24,7 @@ package org.apache.sysds.test.component.compress; */ public class TestConstants { - private static final int rows[] = {4, 2008, 1283, 5, 1, 251, 5000, 100000, 3123}; + private static final int rows[] = {4, 2008, 1283, 5, 1, 251, 5000, 70000, 3123}; private static final int cols[] = {20, 20, 13, 998, 321, 1, 8, 10, 1}; private static final double[] sparsityValues = {0.9, 0.1, 0.01, 0.0, 1.0};
