This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push: new f308d56 [MINOR] Cleanup warnings (imports, serial IDs, static, formatting) f308d56 is described below commit f308d56c560df52785e52456ece60ac74de49672 Author: Matthias Boehm <mboe...@gmail.com> AuthorDate: Sat Oct 31 15:48:39 2020 +0100 [MINOR] Cleanup warnings (imports, serial IDs, static, formatting) --- .../runtime/compress/colgroup/ColGroupValue.java | 2 +- .../sysds/runtime/compress/lib/LibLeftMultBy.java | 1002 ++++++++-------- .../sysds/runtime/compress/lib/LibRightMultBy.java | 1198 ++++++++++---------- .../sysds/runtime/compress/lib/LibScalar.java | 288 ++--- .../paramserv/FederatedPSControlThread.java | 8 + .../finegrained/FineGrainedPrivacyList.java | 20 +- .../privacy/propagation/PrivacyPropagator.java | 1 - .../component/paramserv/SerializationTest.java | 8 +- .../test/functions/privacy/ReadWriteTest.java | 2 +- .../privacy/propagation/AppendPropagatorTest.java | 463 ++++---- 10 files changed, 1493 insertions(+), 1499 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java index 21dfa85..ab06509 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java @@ -323,7 +323,7 @@ public abstract class ColGroupValue extends ColGroup implements Cloneable { return ret; } - protected final double[] sparsePreaggValues(int numVals, double v, boolean allocNew, double[] dictVals) { + protected static double[] sparsePreaggValues(int numVals, double v, boolean allocNew, double[] dictVals) { double[] ret = allocNew ? new double[numVals + 1] : allocDVector(numVals + 1, true); for(int k = 0; k < numVals; k++) diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java b/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java index 004e601..fd5a084 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java @@ -45,505 +45,505 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.CommonThreadPool; public class LibLeftMultBy { - private static final Log LOG = LogFactory.getLog(LibLeftMultBy.class.getName()); - - public static MatrixBlock leftMultByMatrix(List<ColGroup> groups, MatrixBlock that, MatrixBlock ret, - boolean doTranspose, boolean allocTmp, int rl, int cl, boolean overlapping, int k, Pair<Integer, int[]> v) { - - if(ret == null) - ret = new MatrixBlock(rl, cl, false, rl * cl); - else if(!(ret.getNumColumns() == cl && ret.getNumRows() == rl && ret.isAllocated())) - ret.reset(rl, cl, false, rl * cl); - that = that instanceof CompressedMatrixBlock ? ((CompressedMatrixBlock) that).decompress() : that; - - // if(that.getNumRows() == 1) { - // if(k > 1) { - // return leftMultByVectorTranspose(groups, that, ret, doTranspose, k, v, overlapping); - // } - // else { - // return leftMultByVectorTranspose(groups, that, ret, doTranspose, true, v, overlapping); - // } - // } - // else { - return leftMultByMatrix(groups, that, ret, k, cl, v, overlapping); - // } - } - - public static void leftMultByTransposeSelf(List<ColGroup> groups, MatrixBlock result, int gl, int gu, int k, - int numColumns, Pair<Integer, int[]> v, boolean overlapping) { - if(k <= 1 || overlapping) { - leftMultByTransposeSelf(groups, result, gl, gu, v, overlapping); - } - else { - try { - ExecutorService pool = CommonThreadPool.get(k); - ArrayList<MatrixMultTransposeTask> tasks = new ArrayList<>(); - int numgrp = groups.size(); - int blklen = (int) (Math.ceil((double) numgrp / (2 * k))); - for(int i = 0; i < 2 * k & i * blklen < numColumns; i++) - tasks.add(new MatrixMultTransposeTask(groups, result, i * blklen, - Math.min((i + 1) * blklen, numgrp), v, overlapping)); - List<Future<Object>> ret = pool.invokeAll(tasks); - for(Future<Object> tret : ret) - tret.get(); // check for errors - pool.shutdown(); - } - catch(InterruptedException | ExecutionException e) { - throw new DMLRuntimeException(e); - } - } - } - - private static MatrixBlock leftMultByMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k, - int numColumns, Pair<Integer, int[]> v, boolean overlapping) { - ret.allocateDenseBlock(); - if(that.isInSparseFormat()) { - ret = leftMultBySparseMatrix(colGroups, that, ret, k, numColumns, v); - } - else { - ret = leftMultByDenseMatrix(colGroups, that, ret, k, numColumns, v, overlapping); - } - - ret.setNonZeros(ret.getNumColumns() * ret.getNumRows()); - return ret; - } - - private static MatrixBlock leftMultByDenseMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k, - int numColumns, Pair<Integer, int[]> v, boolean overlapping) { - DenseBlock db = that.getDenseBlock(); - if(db == null) - throw new DMLRuntimeException("Invalid LeftMult By Dense matrix, input matrix was sparse"); - - double[] retV = ret.getDenseBlockValues(); - double[] thatV; - int blockU; - int blockL = 0; - for(ColGroup grp : colGroups) - if(grp instanceof ColGroupUncompressed) - ((ColGroupUncompressed) grp).leftMultByMatrix(that, ret); - - for(int b = 0; b < db.numBlocks(); b++) { - int blockSize = db.blockSize(b); - blockU = Math.min(blockL + blockSize, ret.getNumRows()); - thatV = db.valuesAt(b); - - if(k == 1 || overlapping) { - // Pair<Integer, int[]> v = getMaxNumValues(colGroups); - for(int j = 0; j < colGroups.size(); j++) { - colGroups.get(j).leftMultByMatrix(thatV, - retV, - colGroups.get(j).getValues(), - that.getNumRows(), - ret.getNumColumns(), - 0, - ret.getNumRows(), - 0); - } - } - else { - try { - ExecutorService pool = CommonThreadPool.get(k); - // compute remaining compressed column groups in parallel - ArrayList<LeftMatrixMatrixMultTask> tasks = new ArrayList<>(); - int rowBlockSize = 1; - for(int blo = blockL; blo < blockU; blo += rowBlockSize) { - tasks.add(new LeftMatrixMatrixMultTask(colGroups, thatV, retV, that.getNumRows(), numColumns, - blo, Math.min(blo + rowBlockSize, blockU), blo - blockL, v)); - } - - List<Future<Object>> futures = pool.invokeAll(tasks); - - pool.shutdown(); - for(Future<Object> future : futures) - future.get(); - } - catch(InterruptedException | ExecutionException e) { - throw new DMLRuntimeException(e); - } - } - blockL += blockSize; - } - return ret; - } - - private static MatrixBlock leftMultByVectorTranspose(List<ColGroup> colGroups, MatrixBlock vector, - MatrixBlock result, boolean doTranspose, boolean allocTmp, Pair<Integer, int[]> v, boolean overlap) { - - MatrixBlock rowVector = vector; - // Note that transpose here is a metadata operation since the input is a vector. - if(doTranspose) { - rowVector = new MatrixBlock(1, vector.getNumRows(), false); - LibMatrixReorg.transpose(vector, rowVector); - } - - // initialize and allocate the result - result.reset(); - result.allocateDenseBlock(); - - // setup memory pool for reuse - if(allocTmp) { - // Pair<Integer, int[]> v = getMaxNumValues(colGroups); - ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1); // +1 for efficiency in DDC groups. - for(int i = 0; i < colGroups.size(); i++) { - colGroups.get(i).leftMultByRowVector(rowVector.getDenseBlockValues(), - result.getDenseBlockValues(), - v.getRight()[i]); - } - } - else { - - for(ColGroup grp : colGroups) { - grp.leftMultByRowVector(rowVector.getDenseBlockValues(), result.getDenseBlockValues(), -1); - } - } - - // delegate matrix-vector operation to each column group - - // post-processing - if(allocTmp) - ColGroupValue.cleanupThreadLocalMemory(); - result.recomputeNonZeros(); - - return result; - } - - public static MatrixBlock leftMultByVectorTranspose(List<ColGroup> colGroups, MatrixBlock vector, - MatrixBlock result, boolean doTranspose, int k, Pair<Integer, int[]> v, boolean overlap) { - // transpose vector if required - MatrixBlock rowVector = vector; - if(doTranspose) { - rowVector = new MatrixBlock(1, vector.getNumRows(), false); - LibMatrixReorg.transpose(vector, rowVector); - } - - // initialize and allocate the result - result.reset(); - result.allocateDenseBlock(); - - // multi-threaded execution - try { - // compute uncompressed column group in parallel - // ColGroupUncompressed uc = getUncompressedColGroup(); - // if(uc != null) - // uc.leftMultByRowVector(rowVector, result, k); - - // compute remaining compressed column groups in parallel - ExecutorService pool = CommonThreadPool.get(Math.min(colGroups.size(), k)); - ArrayList<LeftMatrixVectorMultTask> tasks = new ArrayList<>(); - - // if(overlap){ - tasks.add(new LeftMatrixVectorMultTask(colGroups, rowVector, result, v)); - // } else{ - // ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning(colGroups, 4 * k, true); - // for(ArrayList<ColGroup> groups : grpParts) - // tasks.add(new LeftMatrixVectorMultTask(groups, rowVector, result, v)); - // } - - List<Future<Object>> ret = pool.invokeAll(tasks); - pool.shutdown(); - for(Future<Object> tmp : ret) - tmp.get(); - - } - catch(InterruptedException | ExecutionException e) { - LOG.error(e); - throw new DMLRuntimeException(e); - } - - // post-processing - result.recomputeNonZeros(); - - return result; - } - - private static MatrixBlock leftMultBySparseMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, - int k, int numColumns, Pair<Integer, int[]> v) { - - SparseBlock sb = that.getSparseBlock(); - if(sb == null) - throw new DMLRuntimeException("Invalid Left Mult by Sparse matrix, input matrix was dense"); - - for(ColGroup grp : colGroups) { - if(grp instanceof ColGroupUncompressed) - ((ColGroupUncompressed) grp).leftMultByMatrix(that, ret); - } - - if(k == 1) { - double[][] materialized = new double[colGroups.size()][]; - boolean containsOLE = false; - for(int i = 0; i < colGroups.size(); i++) { - materialized[i] = colGroups.get(i).getValues(); - if(colGroups.get(i) instanceof ColGroupOLE) { - containsOLE = true; - } - } - double[] materializedRow = containsOLE ? new double[CompressionSettings.BITMAP_BLOCK_SZ * 2] : null; - - for(int r = 0; r < that.getNumRows(); r++) { - SparseRow row = sb.get(r); - if(row != null) { - - for(int j = 0; j < colGroups.size(); j++) { - colGroups.get(j).leftMultBySparseMatrix(row.size(), - row.indexes(), - row.values(), - ret.getDenseBlockValues(), - v.getRight()[j], - materialized[j], - that.getNumRows(), - ret.getNumColumns(), - r, - materializedRow); - } - } - } - } - else { - ExecutorService pool = CommonThreadPool.get(k); - ArrayList<LeftMatrixSparseMatrixMultTask> tasks = new ArrayList<>(); - try { - // compute remaining compressed column groups in parallel - // List<ColGroup>[] parts = createStaticTaskPartitioningForSparseMatrixMult(colGroups, k, false); - // for(List<ColGroup> part : parts) { - tasks.add(new LeftMatrixSparseMatrixMultTask(colGroups, sb, ret.getDenseBlockValues(), - that.getNumRows(), numColumns, v)); - // } - - List<Future<Object>> futures = pool.invokeAll(tasks); - pool.shutdown(); - for(Future<Object> future : futures) - future.get(); - } - catch(InterruptedException | ExecutionException e) { - throw new DMLRuntimeException(e); - } - } - - return ret; - - } - - private static void leftMultByTransposeSelf(List<ColGroup> groups, MatrixBlock result, int gl, int gu, - Pair<Integer, int[]> v, boolean overlapping) { - final int numRows = groups.get(0).getNumRows(); - - // preallocated dense tmp matrix blocks - MatrixBlock lhs = new MatrixBlock(1, numRows, false); - MatrixBlock tmpret = new MatrixBlock(1, result.getNumColumns(), false); - lhs.allocateDenseBlock(); - tmpret.allocateDenseBlock(); - - // setup memory pool for reuse - ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1); - - // approach: for each colgroup, extract uncompressed columns one at-a-time - // vector-matrix multiplies against remaining col groups - // for(int i = gl; i < gu; i++) { - // get current group and relevant col groups - // ColGroup group = groups.get(i); - // int[] ixgroup = group.getColIndices(); - // List<ColGroup> tmpList = groups.subList(i, numGroups); - - // if(group instanceof ColGroupDDC // single DDC group - // && ixgroup.length == 1 && !containsUC && numRows < CompressionSettings.BITMAP_BLOCK_SZ) { - // // compute vector-matrix partial result - // leftMultByVectorTranspose(tmpList, (ColGroupDDC) group, tmpret); - - // // write partial results (disjoint non-zeros) - // LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, ixgroup[0]); - // } - // else { - // for all uncompressed lhs columns vectors - for(int j = 0; j < result.getNumColumns(); j++) { - ColGroup.decompressToBlock(lhs, j, groups); - - if(!lhs.isEmptyBlock(false)) { - // tmpret.reset(); - // compute vector-matrix partial result - // leftMultByMatrix(groups,lhs, tmpret, false, true, 0, 0, overlapping, 1, v ); - leftMultByVectorTranspose(groups, lhs, tmpret, false, true, v, overlapping); - // LOG.error(tmpret); - - // write partial results (disjoint non-zeros) - LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, j); - } - lhs.reset(); - // } - // } - } - - // post processing - ColGroupValue.cleanupThreadLocalMemory(); - } - - private static class LeftMatrixVectorMultTask implements Callable<Object> { - private final List<ColGroup> _groups; - private final MatrixBlock _vect; - private final MatrixBlock _ret; - private final Pair<Integer, int[]> _v; - - protected LeftMatrixVectorMultTask(List<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, - Pair<Integer, int[]> v) { - _groups = groups; - _vect = vect; - _ret = ret; - _v = v; - } - - @Override - public Object call() { - // setup memory pool for reuse - try { - ColGroupValue.setupThreadLocalMemory(_v.getLeft() + 1); - for(int i = 0; i < _groups.size(); i++) { - _groups.get(i) - .leftMultByRowVector(_vect.getDenseBlockValues(), _ret.getDenseBlockValues(), _v.getRight()[i]); - } - - ColGroupValue.cleanupThreadLocalMemory(); - } - catch(Exception e) { - throw new DMLRuntimeException(e); - } - return null; - } - } - - private static class LeftMatrixMatrixMultTask implements Callable<Object> { - private final List<ColGroup> _group; - private final double[] _that; - private final double[] _ret; - private final int _numRows; - private final int _numCols; - private final int _rl; - private final int _ru; - private final int _vOff; - private final Pair<Integer, int[]> _v; - - protected LeftMatrixMatrixMultTask(List<ColGroup> group, double[] that, double[] ret, int numRows, int numCols, - int rl, int ru, int vOff, Pair<Integer, int[]> v) { - _group = group; - _that = that; - _ret = ret; - _numRows = numRows; - _numCols = numCols; - _rl = rl; - _ru = ru; - _vOff = vOff; - _v = v; - } - - @Override - public Object call() { - // setup memory pool for reuse - - double[][] materialized = new double[_group.size()][]; - for(int i = 0; i < _group.size(); i++) { - materialized[i] = _group.get(i).getValues(); - } - // Pair<Integer, int[]> v = getMaxNumValues(_group); - try { - ColGroupValue.setupThreadLocalMemory(_v.getLeft() + 1); - for(int j = 0; j < _group.size(); j++) { - _group.get(j).leftMultByMatrix(_that, _ret, materialized[j], _numRows, _numCols, _rl, _ru, _vOff); - } - ColGroupValue.cleanupThreadLocalMemory(); - - } - catch(Exception e) { - throw new DMLRuntimeException(e); - } - return null; - } - } - - private static class LeftMatrixSparseMatrixMultTask implements Callable<Object> { - private final List<ColGroup> _group; - private final SparseBlock _that; - private final double[] _ret; - private final int _numRows; - private final int _numCols; - private final Pair<Integer, int[]> _v; - - protected LeftMatrixSparseMatrixMultTask(List<ColGroup> group, SparseBlock that, double[] ret, int numRows, - int numCols, Pair<Integer, int[]> v) { - _group = group; - _that = that; - _ret = ret; - _numRows = numRows; - _numCols = numCols; - _v = v; - } - - @Override - public Object call() { - // setup memory pool for reuse - - // double[][] materialized = new double[_group.size()][]; - // for(int i = 0; i < _group.size(); i++) { - // materialized[i] = _group.get(i).getValues(); - // } - - boolean containsOLE = false; - for(int j = 0; j < _group.size(); j++) { - if(_group.get(j) instanceof ColGroupOLE) { - containsOLE = true; - } - } - // Temporary Array to store 2 * block size in - double[] tmpA = containsOLE ? new double[CompressionSettings.BITMAP_BLOCK_SZ * 2] : null; - - ColGroupValue.setupThreadLocalMemory(_v.getLeft()); - try { - for(int j = 0; j < _group.size(); j++) { - double[] materializedV = _group.get(j).getValues(); - for(int r = 0; r < _that.numRows(); r++) { - if(_that.get(r) != null) { - _group.get(j).leftMultBySparseMatrix(_that.get(r).size(), - _that.get(r).indexes(), - _that.get(r).values(), - _ret, - _v.getRight()[j], - materializedV, - _numRows, - _numCols, - r, - tmpA); - } - } - } - } - catch(Exception e) { - e.printStackTrace(); - throw new DMLRuntimeException(e); - } - ColGroupValue.cleanupThreadLocalMemory(); - return null; - } - } - - private static class MatrixMultTransposeTask implements Callable<Object> { - private final List<ColGroup> _groups; - private final MatrixBlock _ret; - private final int _gl; - private final int _gu; - private final Pair<Integer, int[]> _v; - private final boolean _overlapping; - - protected MatrixMultTransposeTask(List<ColGroup> groups, MatrixBlock ret, int gl, int gu, - Pair<Integer, int[]> v, boolean overlapping) { - _groups = groups; - _ret = ret; - _gl = gl; - _gu = gu; - _v = v; - _overlapping = overlapping; - } - - @Override - public Object call() { - leftMultByTransposeSelf(_groups, _ret, _gl, _gu, _v, _overlapping); - return null; - } - } + private static final Log LOG = LogFactory.getLog(LibLeftMultBy.class.getName()); + + public static MatrixBlock leftMultByMatrix(List<ColGroup> groups, MatrixBlock that, MatrixBlock ret, + boolean doTranspose, boolean allocTmp, int rl, int cl, boolean overlapping, int k, Pair<Integer, int[]> v) { + + if(ret == null) + ret = new MatrixBlock(rl, cl, false, rl * cl); + else if(!(ret.getNumColumns() == cl && ret.getNumRows() == rl && ret.isAllocated())) + ret.reset(rl, cl, false, rl * cl); + that = that instanceof CompressedMatrixBlock ? ((CompressedMatrixBlock) that).decompress() : that; + + // if(that.getNumRows() == 1) { + // if(k > 1) { + // return leftMultByVectorTranspose(groups, that, ret, doTranspose, k, v, overlapping); + // } + // else { + // return leftMultByVectorTranspose(groups, that, ret, doTranspose, true, v, overlapping); + // } + // } + // else { + return leftMultByMatrix(groups, that, ret, k, cl, v, overlapping); + // } + } + + public static void leftMultByTransposeSelf(List<ColGroup> groups, MatrixBlock result, int gl, int gu, int k, + int numColumns, Pair<Integer, int[]> v, boolean overlapping) { + if(k <= 1 || overlapping) { + leftMultByTransposeSelf(groups, result, gl, gu, v, overlapping); + } + else { + try { + ExecutorService pool = CommonThreadPool.get(k); + ArrayList<MatrixMultTransposeTask> tasks = new ArrayList<>(); + int numgrp = groups.size(); + int blklen = (int) (Math.ceil((double) numgrp / (2 * k))); + for(int i = 0; i < 2 * k & i * blklen < numColumns; i++) + tasks.add(new MatrixMultTransposeTask(groups, result, i * blklen, + Math.min((i + 1) * blklen, numgrp), v, overlapping)); + List<Future<Object>> ret = pool.invokeAll(tasks); + for(Future<Object> tret : ret) + tret.get(); // check for errors + pool.shutdown(); + } + catch(InterruptedException | ExecutionException e) { + throw new DMLRuntimeException(e); + } + } + } + + private static MatrixBlock leftMultByMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k, + int numColumns, Pair<Integer, int[]> v, boolean overlapping) { + ret.allocateDenseBlock(); + if(that.isInSparseFormat()) { + ret = leftMultBySparseMatrix(colGroups, that, ret, k, numColumns, v); + } + else { + ret = leftMultByDenseMatrix(colGroups, that, ret, k, numColumns, v, overlapping); + } + + ret.setNonZeros(ret.getNumColumns() * ret.getNumRows()); + return ret; + } + + private static MatrixBlock leftMultByDenseMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k, + int numColumns, Pair<Integer, int[]> v, boolean overlapping) { + DenseBlock db = that.getDenseBlock(); + if(db == null) + throw new DMLRuntimeException("Invalid LeftMult By Dense matrix, input matrix was sparse"); + + double[] retV = ret.getDenseBlockValues(); + double[] thatV; + int blockU; + int blockL = 0; + for(ColGroup grp : colGroups) + if(grp instanceof ColGroupUncompressed) + ((ColGroupUncompressed) grp).leftMultByMatrix(that, ret); + + for(int b = 0; b < db.numBlocks(); b++) { + int blockSize = db.blockSize(b); + blockU = Math.min(blockL + blockSize, ret.getNumRows()); + thatV = db.valuesAt(b); + + if(k == 1 || overlapping) { + // Pair<Integer, int[]> v = getMaxNumValues(colGroups); + for(int j = 0; j < colGroups.size(); j++) { + colGroups.get(j).leftMultByMatrix(thatV, + retV, + colGroups.get(j).getValues(), + that.getNumRows(), + ret.getNumColumns(), + 0, + ret.getNumRows(), + 0); + } + } + else { + try { + ExecutorService pool = CommonThreadPool.get(k); + // compute remaining compressed column groups in parallel + ArrayList<LeftMatrixMatrixMultTask> tasks = new ArrayList<>(); + int rowBlockSize = 1; + for(int blo = blockL; blo < blockU; blo += rowBlockSize) { + tasks.add(new LeftMatrixMatrixMultTask(colGroups, thatV, retV, that.getNumRows(), numColumns, + blo, Math.min(blo + rowBlockSize, blockU), blo - blockL, v)); + } + + List<Future<Object>> futures = pool.invokeAll(tasks); + + pool.shutdown(); + for(Future<Object> future : futures) + future.get(); + } + catch(InterruptedException | ExecutionException e) { + throw new DMLRuntimeException(e); + } + } + blockL += blockSize; + } + return ret; + } + + private static MatrixBlock leftMultByVectorTranspose(List<ColGroup> colGroups, MatrixBlock vector, + MatrixBlock result, boolean doTranspose, boolean allocTmp, Pair<Integer, int[]> v, boolean overlap) { + + MatrixBlock rowVector = vector; + // Note that transpose here is a metadata operation since the input is a vector. + if(doTranspose) { + rowVector = new MatrixBlock(1, vector.getNumRows(), false); + LibMatrixReorg.transpose(vector, rowVector); + } + + // initialize and allocate the result + result.reset(); + result.allocateDenseBlock(); + + // setup memory pool for reuse + if(allocTmp) { + // Pair<Integer, int[]> v = getMaxNumValues(colGroups); + ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1); // +1 for efficiency in DDC groups. + for(int i = 0; i < colGroups.size(); i++) { + colGroups.get(i).leftMultByRowVector(rowVector.getDenseBlockValues(), + result.getDenseBlockValues(), + v.getRight()[i]); + } + } + else { + + for(ColGroup grp : colGroups) { + grp.leftMultByRowVector(rowVector.getDenseBlockValues(), result.getDenseBlockValues(), -1); + } + } + + // delegate matrix-vector operation to each column group + + // post-processing + if(allocTmp) + ColGroupValue.cleanupThreadLocalMemory(); + result.recomputeNonZeros(); + + return result; + } + + public static MatrixBlock leftMultByVectorTranspose(List<ColGroup> colGroups, MatrixBlock vector, + MatrixBlock result, boolean doTranspose, int k, Pair<Integer, int[]> v, boolean overlap) { + // transpose vector if required + MatrixBlock rowVector = vector; + if(doTranspose) { + rowVector = new MatrixBlock(1, vector.getNumRows(), false); + LibMatrixReorg.transpose(vector, rowVector); + } + + // initialize and allocate the result + result.reset(); + result.allocateDenseBlock(); + + // multi-threaded execution + try { + // compute uncompressed column group in parallel + // ColGroupUncompressed uc = getUncompressedColGroup(); + // if(uc != null) + // uc.leftMultByRowVector(rowVector, result, k); + + // compute remaining compressed column groups in parallel + ExecutorService pool = CommonThreadPool.get(Math.min(colGroups.size(), k)); + ArrayList<LeftMatrixVectorMultTask> tasks = new ArrayList<>(); + + // if(overlap){ + tasks.add(new LeftMatrixVectorMultTask(colGroups, rowVector, result, v)); + // } else{ + // ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning(colGroups, 4 * k, true); + // for(ArrayList<ColGroup> groups : grpParts) + // tasks.add(new LeftMatrixVectorMultTask(groups, rowVector, result, v)); + // } + + List<Future<Object>> ret = pool.invokeAll(tasks); + pool.shutdown(); + for(Future<Object> tmp : ret) + tmp.get(); + + } + catch(InterruptedException | ExecutionException e) { + LOG.error(e); + throw new DMLRuntimeException(e); + } + + // post-processing + result.recomputeNonZeros(); + + return result; + } + + private static MatrixBlock leftMultBySparseMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, + int k, int numColumns, Pair<Integer, int[]> v) { + + SparseBlock sb = that.getSparseBlock(); + if(sb == null) + throw new DMLRuntimeException("Invalid Left Mult by Sparse matrix, input matrix was dense"); + + for(ColGroup grp : colGroups) { + if(grp instanceof ColGroupUncompressed) + ((ColGroupUncompressed) grp).leftMultByMatrix(that, ret); + } + + if(k == 1) { + double[][] materialized = new double[colGroups.size()][]; + boolean containsOLE = false; + for(int i = 0; i < colGroups.size(); i++) { + materialized[i] = colGroups.get(i).getValues(); + if(colGroups.get(i) instanceof ColGroupOLE) { + containsOLE = true; + } + } + double[] materializedRow = containsOLE ? new double[CompressionSettings.BITMAP_BLOCK_SZ * 2] : null; + + for(int r = 0; r < that.getNumRows(); r++) { + SparseRow row = sb.get(r); + if(row != null) { + + for(int j = 0; j < colGroups.size(); j++) { + colGroups.get(j).leftMultBySparseMatrix(row.size(), + row.indexes(), + row.values(), + ret.getDenseBlockValues(), + v.getRight()[j], + materialized[j], + that.getNumRows(), + ret.getNumColumns(), + r, + materializedRow); + } + } + } + } + else { + ExecutorService pool = CommonThreadPool.get(k); + ArrayList<LeftMatrixSparseMatrixMultTask> tasks = new ArrayList<>(); + try { + // compute remaining compressed column groups in parallel + // List<ColGroup>[] parts = createStaticTaskPartitioningForSparseMatrixMult(colGroups, k, false); + // for(List<ColGroup> part : parts) { + tasks.add(new LeftMatrixSparseMatrixMultTask(colGroups, sb, ret.getDenseBlockValues(), + that.getNumRows(), numColumns, v)); + // } + + List<Future<Object>> futures = pool.invokeAll(tasks); + pool.shutdown(); + for(Future<Object> future : futures) + future.get(); + } + catch(InterruptedException | ExecutionException e) { + throw new DMLRuntimeException(e); + } + } + + return ret; + + } + + private static void leftMultByTransposeSelf(List<ColGroup> groups, MatrixBlock result, int gl, int gu, + Pair<Integer, int[]> v, boolean overlapping) { + final int numRows = groups.get(0).getNumRows(); + + // preallocated dense tmp matrix blocks + MatrixBlock lhs = new MatrixBlock(1, numRows, false); + MatrixBlock tmpret = new MatrixBlock(1, result.getNumColumns(), false); + lhs.allocateDenseBlock(); + tmpret.allocateDenseBlock(); + + // setup memory pool for reuse + ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1); + + // approach: for each colgroup, extract uncompressed columns one at-a-time + // vector-matrix multiplies against remaining col groups + // for(int i = gl; i < gu; i++) { + // get current group and relevant col groups + // ColGroup group = groups.get(i); + // int[] ixgroup = group.getColIndices(); + // List<ColGroup> tmpList = groups.subList(i, numGroups); + + // if(group instanceof ColGroupDDC // single DDC group + // && ixgroup.length == 1 && !containsUC && numRows < CompressionSettings.BITMAP_BLOCK_SZ) { + // // compute vector-matrix partial result + // leftMultByVectorTranspose(tmpList, (ColGroupDDC) group, tmpret); + + // // write partial results (disjoint non-zeros) + // LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, ixgroup[0]); + // } + // else { + // for all uncompressed lhs columns vectors + for(int j = 0; j < result.getNumColumns(); j++) { + ColGroup.decompressToBlock(lhs, j, groups); + + if(!lhs.isEmptyBlock(false)) { + // tmpret.reset(); + // compute vector-matrix partial result + // leftMultByMatrix(groups,lhs, tmpret, false, true, 0, 0, overlapping, 1, v ); + leftMultByVectorTranspose(groups, lhs, tmpret, false, true, v, overlapping); + // LOG.error(tmpret); + + // write partial results (disjoint non-zeros) + LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, j); + } + lhs.reset(); + // } + // } + } + + // post processing + ColGroupValue.cleanupThreadLocalMemory(); + } + + private static class LeftMatrixVectorMultTask implements Callable<Object> { + private final List<ColGroup> _groups; + private final MatrixBlock _vect; + private final MatrixBlock _ret; + private final Pair<Integer, int[]> _v; + + protected LeftMatrixVectorMultTask(List<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, + Pair<Integer, int[]> v) { + _groups = groups; + _vect = vect; + _ret = ret; + _v = v; + } + + @Override + public Object call() { + // setup memory pool for reuse + try { + ColGroupValue.setupThreadLocalMemory(_v.getLeft() + 1); + for(int i = 0; i < _groups.size(); i++) { + _groups.get(i) + .leftMultByRowVector(_vect.getDenseBlockValues(), _ret.getDenseBlockValues(), _v.getRight()[i]); + } + + ColGroupValue.cleanupThreadLocalMemory(); + } + catch(Exception e) { + throw new DMLRuntimeException(e); + } + return null; + } + } + + private static class LeftMatrixMatrixMultTask implements Callable<Object> { + private final List<ColGroup> _group; + private final double[] _that; + private final double[] _ret; + private final int _numRows; + private final int _numCols; + private final int _rl; + private final int _ru; + private final int _vOff; + private final Pair<Integer, int[]> _v; + + protected LeftMatrixMatrixMultTask(List<ColGroup> group, double[] that, double[] ret, int numRows, int numCols, + int rl, int ru, int vOff, Pair<Integer, int[]> v) { + _group = group; + _that = that; + _ret = ret; + _numRows = numRows; + _numCols = numCols; + _rl = rl; + _ru = ru; + _vOff = vOff; + _v = v; + } + + @Override + public Object call() { + // setup memory pool for reuse + + double[][] materialized = new double[_group.size()][]; + for(int i = 0; i < _group.size(); i++) { + materialized[i] = _group.get(i).getValues(); + } + // Pair<Integer, int[]> v = getMaxNumValues(_group); + try { + ColGroupValue.setupThreadLocalMemory(_v.getLeft() + 1); + for(int j = 0; j < _group.size(); j++) { + _group.get(j).leftMultByMatrix(_that, _ret, materialized[j], _numRows, _numCols, _rl, _ru, _vOff); + } + ColGroupValue.cleanupThreadLocalMemory(); + + } + catch(Exception e) { + throw new DMLRuntimeException(e); + } + return null; + } + } + + private static class LeftMatrixSparseMatrixMultTask implements Callable<Object> { + private final List<ColGroup> _group; + private final SparseBlock _that; + private final double[] _ret; + private final int _numRows; + private final int _numCols; + private final Pair<Integer, int[]> _v; + + protected LeftMatrixSparseMatrixMultTask(List<ColGroup> group, SparseBlock that, double[] ret, int numRows, + int numCols, Pair<Integer, int[]> v) { + _group = group; + _that = that; + _ret = ret; + _numRows = numRows; + _numCols = numCols; + _v = v; + } + + @Override + public Object call() { + // setup memory pool for reuse + + // double[][] materialized = new double[_group.size()][]; + // for(int i = 0; i < _group.size(); i++) { + // materialized[i] = _group.get(i).getValues(); + // } + + boolean containsOLE = false; + for(int j = 0; j < _group.size(); j++) { + if(_group.get(j) instanceof ColGroupOLE) { + containsOLE = true; + } + } + // Temporary Array to store 2 * block size in + double[] tmpA = containsOLE ? new double[CompressionSettings.BITMAP_BLOCK_SZ * 2] : null; + + ColGroupValue.setupThreadLocalMemory(_v.getLeft()); + try { + for(int j = 0; j < _group.size(); j++) { + double[] materializedV = _group.get(j).getValues(); + for(int r = 0; r < _that.numRows(); r++) { + if(_that.get(r) != null) { + _group.get(j).leftMultBySparseMatrix(_that.get(r).size(), + _that.get(r).indexes(), + _that.get(r).values(), + _ret, + _v.getRight()[j], + materializedV, + _numRows, + _numCols, + r, + tmpA); + } + } + } + } + catch(Exception e) { + e.printStackTrace(); + throw new DMLRuntimeException(e); + } + ColGroupValue.cleanupThreadLocalMemory(); + return null; + } + } + + private static class MatrixMultTransposeTask implements Callable<Object> { + private final List<ColGroup> _groups; + private final MatrixBlock _ret; + private final int _gl; + private final int _gu; + private final Pair<Integer, int[]> _v; + private final boolean _overlapping; + + protected MatrixMultTransposeTask(List<ColGroup> groups, MatrixBlock ret, int gl, int gu, + Pair<Integer, int[]> v, boolean overlapping) { + _groups = groups; + _ret = ret; + _gl = gl; + _gu = gu; + _v = v; + _overlapping = overlapping; + } + + @Override + public Object call() { + leftMultByTransposeSelf(_groups, _ret, _gl, _gu, _v, _overlapping); + return null; + } + } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java b/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java index df0f45e..761e5a4 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java @@ -43,603 +43,603 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.CommonThreadPool; public class LibRightMultBy { - private static final Log LOG = LogFactory.getLog(LibRightMultBy.class.getName()); - - /** - * Right multiply by matrix. Meaning a left hand side compressed matrix is multiplied with a right hand side - * uncompressed matrix. - * - * @param colGroups All Column groups in the compression - * @param that The right hand side matrix - * @param ret The MatrixBlock to return. - * @param k The parallelization degree to use. - * @param v The Precalculated counts and Maximum number of tuple entries in the column groups. - * @param allowOverlap Allow the multiplication to return an overlapped matrix. - * @return The Result Matrix, modified from the ret parameter. - */ - public static MatrixBlock rightMultByMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k, - Pair<Integer, int[]> v, boolean allowOverlap) { - - boolean containsUncompressable = false; - int distinctCount = 0; - for(ColGroup g : colGroups) { - if(g instanceof ColGroupValue) { - distinctCount += ((ColGroupValue) g).getNumValues(); - } - else { - containsUncompressable = true; - } - } - int rl = colGroups.get(0).getNumRows(); - int cl = that.getNumColumns(); - if(!allowOverlap || (containsUncompressable || distinctCount >= rl / 2)) { - if(ret == null) - ret = new MatrixBlock(rl, cl, false, rl * cl); - else if(!(ret.getNumColumns() == cl && ret.getNumRows() == rl && ret.isAllocated())) - ret.reset(rl, cl, false, rl * cl); - ret.allocateDenseBlock(); - if(that.isInSparseFormat()) { - ret = rightMultBySparseMatrix(colGroups, that, ret, k, v); - } - else { - ret = rightMultByDenseMatrix(colGroups, that, ret, k, v); - - } - ret.setNonZeros(ret.getNumColumns() * ret.getNumRows()); - } - else { - // Create an overlapping compressed Matrix Block. - ret = new CompressedMatrixBlock(true); - - ret.setNumColumns(cl); - ret.setNumRows(rl); - CompressedMatrixBlock retC = (CompressedMatrixBlock) ret; - retC.setOverlapping(true); - if(that.isInSparseFormat()) { - ret = rightMultBySparseMatrixCompressed(colGroups, that, retC, k, v); - } - else { - ret = rightMultByDenseMatrixCompressed(colGroups, that, retC, k, v); - } - } - - return ret; - - } - - /** - * Multi-threaded version of rightMultByVector. - * - * @param colGroups The Column groups used int the multiplication - * @param vector matrix block vector to multiply with - * @param result matrix block result to modify in the multiplication - * @param k number of threads to use - * @param v The Precalculated counts and Maximum number of tuple entries in the column groups - */ - public static void rightMultByVector(List<ColGroup> colGroups, MatrixBlock vector, MatrixBlock result, int k, - Pair<Integer, int[]> v) { - // initialize and allocate the result - result.allocateDenseBlock(); - if(k <= 1) { - rightMultByVector(colGroups, vector, result, v); - return; - } - - // multi-threaded execution of all groups - try { - // ColGroupUncompressed uc = getUncompressedColGroup(); - - // compute uncompressed column group in parallel - // if(uc != null) - // uc.rightMultByVector(vector, result, k); - - // compute remaining compressed column groups in parallel - // note: OLE needs alignment to segment size, otherwise wrong entry - ExecutorService pool = CommonThreadPool.get(k); - int rlen = colGroups.get(0).getNumRows(); - int seqsz = CompressionSettings.BITMAP_BLOCK_SZ; - int blklen = (int) (Math.ceil((double) rlen / k)); - blklen += (blklen % seqsz != 0) ? seqsz - blklen % seqsz : 0; - - ArrayList<RightMatrixVectorMultTask> tasks = new ArrayList<>(); - for(int i = 0; i < k & i * blklen < rlen; i++) { - tasks.add(new RightMatrixVectorMultTask(colGroups, vector, result, i * blklen, - Math.min((i + 1) * blklen, rlen), v)); - } - - List<Future<Long>> ret = pool.invokeAll(tasks); - pool.shutdown(); - - // error handling and nnz aggregation - long lnnz = 0; - for(Future<Long> tmp : ret) - lnnz += tmp.get(); - result.setNonZeros(lnnz); - } - catch(InterruptedException | ExecutionException e) { - throw new DMLRuntimeException(e); - } - } - - /** - * Multiply this matrix block by a column vector on the right. - * - * @param vector right-hand operand of the multiplication - * @param result buffer to hold the result; must have the appropriate size already - * @param v The Precalculated counts and Maximum number of tuple entries in the column groups. - */ - private static void rightMultByVector(List<ColGroup> colGroups, MatrixBlock vector, MatrixBlock result, - Pair<Integer, int[]> v) { - - // delegate matrix-vector operation to each column group - rightMultByVector(colGroups, vector, result, 0, result.getNumRows(), v); - - // post-processing - result.recomputeNonZeros(); - } - - private static MatrixBlock rightMultBySparseMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, - int k, Pair<Integer, int[]> v) { - SparseBlock sb = that.getSparseBlock(); - double[] retV = ret.getDenseBlockValues(); - - if(sb == null) - throw new DMLRuntimeException("Invalid Right Mult by Sparse matrix, input matrix was dense"); - - for(ColGroup grp : colGroups) { - if(grp instanceof ColGroupUncompressed) - ((ColGroupUncompressed) grp).rightMultByMatrix(that, ret, 0, ret.getNumColumns()); - } - - // Pair<Integer, int[]> v = Util.getMaxNumValues(colGroups); - // if(k == 1) { - for(int j = 0; j < colGroups.size(); j++) { - double[] preAggregatedB = ((ColGroupValue) colGroups.get(j)).preaggValues(v.getRight()[j], - sb, - colGroups.get(j).getValues(), - 0, - that.getNumColumns(), - that.getNumColumns()); - colGroups.get(j).rightMultByMatrix(preAggregatedB, - retV, - that.getNumColumns(), - 0, - ret.getNumRows(), - 0, - that.getNumColumns()); - - } - // } - // else { - // ExecutorService pool = CommonThreadPool.get(k); - // ArrayList<RightMultBySparseMatrixTask> tasks = new ArrayList<>(); - // try { - - // for(int j = 0; j < ret.getNumColumns(); j += CompressionSettings.BITMAP_BLOCK_SZ) { - // tasks.add(new RightMultBySparseMatrixTask(colGroups, retV, sb, materialized, v, numColumns, j, - // Math.min(j + CompressionSettings.BITMAP_BLOCK_SZ, ret.getNumColumns()))); - // } - - // List<Future<Object>> futures = pool.invokeAll(tasks); - // pool.shutdown(); - // for(Future<Object> future : futures) - // future.get(); - // } - // catch(InterruptedException | ExecutionException e) { - // throw new DMLRuntimeException(e); - // } - // } - - return ret; - } - - private static MatrixBlock rightMultByDenseMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, - int k, Pair<Integer, int[]> v) { - - // long StartTime = System.currentTimeMillis(); - DenseBlock db = that.getDenseBlock(); - double[] retV = ret.getDenseBlockValues(); - double[] thatV; - - for(ColGroup grp : colGroups) { - if(grp instanceof ColGroupUncompressed) { - ((ColGroupUncompressed) grp).rightMultByMatrix(that, ret, 0, ret.getNumRows()); - } - } - - if(k == 1) { - ColGroupValue.setupThreadLocalMemory((v.getLeft())); - for(int b = 0; b < db.numBlocks(); b++) { - // int blockSize = db.blockSize(b); - thatV = db.valuesAt(b); - for(int j = 0; j < colGroups.size(); j++) { - int colBlockSize = 128; - for(int i = 0; i < that.getNumColumns(); i += colBlockSize) { - if(colGroups.get(j) instanceof ColGroupValue) { - double[] preAggregatedB = ((ColGroupValue) colGroups.get(j)).preaggValues(v.getRight()[j], - thatV, - colGroups.get(j).getValues(), - i, - Math.min(i + colBlockSize, that.getNumColumns()), - that.getNumColumns()); - int blklenRows = CompressionSettings.BITMAP_BLOCK_SZ; - for(int n = 0; n * blklenRows < ret.getNumRows(); n++) { - colGroups.get(j).rightMultByMatrix(preAggregatedB, - retV, - that.getNumColumns(), - n * blklenRows, - Math.min((n + 1) * blklenRows, ret.getNumRows()), - i, - Math.min(i + colBlockSize, that.getNumColumns())); - } - } - } - } - } - ColGroupValue.cleanupThreadLocalMemory(); - } - else { - - thatV = db.valuesAt(0); - ExecutorService pool = CommonThreadPool.get(k); - ArrayList<RightMatrixMultTask> tasks = new ArrayList<>(); - ArrayList<RightMatrixPreAggregateTask> preTask = new ArrayList<>(colGroups.size()); - // Pair<Integer, int[]> v; - final int blkz = CompressionSettings.BITMAP_BLOCK_SZ; - int blklenRows = (int) (Math.ceil((double) ret.getNumRows() / (2 * k))); - - try { - List<Future<double[]>> ag = pool.invokeAll(preAggregate(colGroups, thatV, that, preTask, v)); - // DDC and RLE - for(int j = 0; j * blklenRows < ret.getNumRows(); j++) { - RightMatrixMultTask rmmt = new RightMatrixMultTask(colGroups, retV, ag, v, that.getNumColumns(), - j * blklenRows, Math.min((j + 1) * blklenRows, ret.getNumRows()), 0, that.getNumColumns(), - false, false); - tasks.add(rmmt); - } - blklenRows += (blklenRows % blkz != 0) ? blkz - blklenRows % blkz : 0; - // OLE! - for(int j = 0; j * blklenRows < ret.getNumRows(); j++) { - RightMatrixMultTask rmmt = new RightMatrixMultTask(colGroups, retV, ag, v, that.getNumColumns(), - j * blklenRows, Math.min((j + 1) * blklenRows, ret.getNumRows()), 0, that.getNumColumns(), - false, true); - tasks.add(rmmt); - } - for(Future<Object> future : pool.invokeAll(tasks)) - future.get(); - tasks.clear(); - - } - catch(InterruptedException | ExecutionException e) { - throw new DMLRuntimeException(e); - } - } - - return ret; - } - - private static MatrixBlock rightMultByDenseMatrixCompressed(List<ColGroup> colGroups, MatrixBlock that, - CompressedMatrixBlock ret, int k, Pair<Integer, int[]> v) { - - DenseBlock db = that.getDenseBlock(); - double[] thatV; - - for(ColGroup grp : colGroups) { - if(grp instanceof ColGroupUncompressed) { - throw new DMLCompressionException( - "Right Mult by dense with compressed output is not efficient to do with uncompressed Compressed ColGroups and therefore not supported."); - } - } - - thatV = db.valuesAt(0); - List<ColGroup> retCg = new ArrayList<ColGroup>(); - int[] newColIndexes = new int[that.getNumColumns()]; - for(int i = 0; i < that.getNumColumns(); i++) { - newColIndexes[i] = i; - } - if(k == 1) { - for(int j = 0; j < colGroups.size(); j++) { - ColGroupValue g = (ColGroupValue) colGroups.get(j); - double[] preAggregatedB = g.preaggValues(v.getRight()[j], - thatV, - g.getValues(), - 0, - that.getNumColumns(), - that.getNumColumns(), - new double[v.getRight()[j] * that.getNumColumns()]); - retCg.add(g.copyAndSet(newColIndexes, preAggregatedB)); - } - } - else { - thatV = db.valuesAt(0); - ExecutorService pool = CommonThreadPool.get(k); - ArrayList<RightMatrixPreAggregateTask> preTask = new ArrayList<>(colGroups.size()); - - try { - List<Future<double[]>> ag = pool.invokeAll(preAggregate(colGroups, thatV, that, preTask, v)); - for(int j = 0; j < colGroups.size(); j++) { - retCg.add(((ColGroupValue) colGroups.get(j)).copyAndSet(newColIndexes, ag.get(j).get())); - } - } - catch(InterruptedException | ExecutionException e) { - throw new DMLRuntimeException(e); - } - } - ret.allocateColGroupList(retCg); - ret.setOverlapping(true); - ret.setNonZeros(-1); - - return ret; - } - - private static MatrixBlock rightMultBySparseMatrixCompressed(List<ColGroup> colGroups, MatrixBlock that, - CompressedMatrixBlock ret, int k, Pair<Integer, int[]> v) { - - // long StartTime = System.currentTimeMillis(); - SparseBlock sb = that.getSparseBlock(); - - for(ColGroup grp : colGroups) { - if(grp instanceof ColGroupUncompressed) { - throw new DMLCompressionException( - "Right Mult by dense with compressed output is not efficient to do with uncompressed Compressed ColGroups and therefore not supported."); - } - } - - List<ColGroup> retCg = new ArrayList<ColGroup>(); - int[] newColIndexes = new int[that.getNumColumns()]; - for(int i = 0; i < that.getNumColumns(); i++) { - newColIndexes[i] = i; - } - if(k == 1) { - for(int j = 0; j < colGroups.size(); j++) { - ColGroupValue g = (ColGroupValue) colGroups.get(j); - double[] preAggregatedB = g.preaggValues(v.getRight()[j], - sb, - colGroups.get(j).getValues(), - 0, - that.getNumColumns(), - that.getNumColumns(), - new double[v.getRight()[j] * that.getNumColumns()]); - retCg.add(g.copyAndSet(newColIndexes, preAggregatedB)); - } - } - else { - ExecutorService pool = CommonThreadPool.get(k); - ArrayList<RightMatrixPreAggregateSparseTask> preTask = new ArrayList<>(colGroups.size()); - - try { - List<Future<double[]>> ag = pool.invokeAll(preAggregate(colGroups, sb, that, preTask, v)); - for(int j = 0; j < colGroups.size(); j++) { - retCg.add(((ColGroupValue) colGroups.get(j)).copyAndSet(newColIndexes, ag.get(j).get())); - } - } - catch(InterruptedException | ExecutionException e) { - throw new DMLRuntimeException(e); - } - } - ret.allocateColGroupList(retCg); - ret.setOverlapping(true); - ret.setNonZeros(-1); - - return ret; - } - - private static ArrayList<RightMatrixPreAggregateTask> preAggregate(List<ColGroup> colGroups, double[] thatV, - MatrixBlock that, ArrayList<RightMatrixPreAggregateTask> preTask, Pair<Integer, int[]> v) { - preTask.clear(); - for(int h = 0; h < colGroups.size(); h++) { - RightMatrixPreAggregateTask pAggT = new RightMatrixPreAggregateTask((ColGroupValue) colGroups.get(h), - v.getRight()[h], thatV, colGroups.get(h).getValues(), 0, that.getNumColumns(), that.getNumColumns()); - preTask.add(pAggT); - } - return preTask; - } - - private static ArrayList<RightMatrixPreAggregateSparseTask> preAggregate(List<ColGroup> colGroups, SparseBlock sb, - MatrixBlock that, ArrayList<RightMatrixPreAggregateSparseTask> preTask, Pair<Integer, int[]> v) { - preTask.clear(); - for(int h = 0; h < colGroups.size(); h++) { - RightMatrixPreAggregateSparseTask pAggT = new RightMatrixPreAggregateSparseTask( - (ColGroupValue) colGroups.get(h), v.getRight()[h], sb, colGroups.get(h).getValues(), 0, - that.getNumColumns(), that.getNumColumns()); - preTask.add(pAggT); - } - return preTask; - } - - private static void rightMultByVector(List<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, int rl, int ru, - Pair<Integer, int[]> v) { - // + 1 to enable containing a single 0 value in the dictionary that was not materialized. - // This is to handle the case of a DDC dictionary not materializing the zero values. - // A fine tradeoff! - ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1); - - // boolean cacheDDC1 = ru - rl > CompressionSettings.BITMAP_BLOCK_SZ * 2; - - // process uncompressed column group (overwrites output) - // if(inclUC) { - for(ColGroup grp : groups) { - if(grp instanceof ColGroupUncompressed) - ((ColGroupUncompressed) grp).rightMultByVector(vect, ret, rl, ru); - } - - // process cache-conscious DDC1 groups (adds to output) - - // if(cacheDDC1) { - // ArrayList<ColGroupDDC1> tmp = new ArrayList<>(); - // for(ColGroup grp : groups) - // if(grp instanceof ColGroupDDC1) - // tmp.add((ColGroupDDC1) grp); - // if(!tmp.isEmpty()) - // ColGroupDDC1.rightMultByVector(tmp.toArray(new ColGroupDDC1[0]), vect, ret, rl, ru); - // } - // process remaining groups (adds to output) - double[] values = ret.getDenseBlockValues(); - for(ColGroup grp : groups) { - if(!(grp instanceof ColGroupUncompressed)) { - grp.rightMultByVector(vect.getDenseBlockValues(), values, rl, ru, grp.getValues()); - } - } - - ColGroupValue.cleanupThreadLocalMemory(); - - } - - private static class RightMatrixMultTask implements Callable<Object> { - private final List<ColGroup> _colGroups; - // private final double[] _thatV; - private final double[] _retV; - private final List<Future<double[]>> _aggB; - private final Pair<Integer, int[]> _v; - private final int _numColumns; - - private final int _rl; - private final int _ru; - private final int _cl; - private final int _cu; - private final boolean _mem; - private final boolean _skipOle; - - protected RightMatrixMultTask(List<ColGroup> groups, double[] retV, List<Future<double[]>> aggB, - Pair<Integer, int[]> v, int numColumns, int rl, int ru, int cl, int cu, boolean mem, boolean skipOle) { - _colGroups = groups; - // _thatV = thatV; - _retV = retV; - _aggB = aggB; - _v = v; - _numColumns = numColumns; - _rl = rl; - _ru = ru; - _cl = cl; - _cu = cu; - _mem = mem; - _skipOle = skipOle; - } - - @Override - public Object call() { - try { - if(_mem) - ColGroupValue.setupThreadLocalMemory((_v.getLeft())); - for(int j = 0; j < _colGroups.size(); j++) { - if(_colGroups.get(j) instanceof ColGroupOLE) { - if(_skipOle) { - _colGroups.get(j) - .rightMultByMatrix(_aggB.get(j).get(), _retV, _numColumns, _rl, _ru, _cl, _cu); - } - } - else { - if(!_skipOle) { - _colGroups.get(j) - .rightMultByMatrix(_aggB.get(j).get(), _retV, _numColumns, _rl, _ru, _cl, _cu); - } - } - } - if(_mem) - ColGroupValue.cleanupThreadLocalMemory(); - return null; - } - catch(Exception e) { - LOG.error(e); - throw new DMLRuntimeException(e); - } - } - } - - private static class RightMatrixPreAggregateTask implements Callable<double[]> { - private final ColGroupValue _colGroup; - private final int _numVals; - private final double[] _b; - private final double[] _dict; - - private final int _cl; - private final int _cu; - private final int _cut; - - protected RightMatrixPreAggregateTask(ColGroupValue colGroup, int numVals, double[] b, double[] dict, int cl, - int cu, int cut) { - _colGroup = colGroup; - _numVals = numVals; - _b = b; - _dict = dict; - _cl = cl; - _cu = cu; - _cut = cut; - } - - @Override - public double[] call() { - try { - return _colGroup.preaggValues(_numVals, _b, _dict, _cl, _cu, _cut); - } - catch(Exception e) { - LOG.error(e); - throw new DMLRuntimeException(e); - } - } - } - - private static class RightMatrixPreAggregateSparseTask implements Callable<double[]> { - private final ColGroupValue _colGroup; - private final int _numVals; - private final SparseBlock _b; - private final double[] _dict; - - private final int _cl; - private final int _cu; - private final int _cut; - - protected RightMatrixPreAggregateSparseTask(ColGroupValue colGroup, int numVals, SparseBlock b, double[] dict, - int cl, int cu, int cut) { - _colGroup = colGroup; - _numVals = numVals; - _b = b; - _dict = dict; - _cl = cl; - _cu = cu; - _cut = cut; - } - - @Override - public double[] call() { - try { - return _colGroup.preaggValues(_numVals, _b, _dict, _cl, _cu, _cut); - } - catch(Exception e) { - LOG.error(e); - throw new DMLRuntimeException(e); - } - } - } - - private static class RightMatrixVectorMultTask implements Callable<Long> { - private final List<ColGroup> _groups; - private final MatrixBlock _vect; - private final MatrixBlock _ret; - private final int _rl; - private final int _ru; - private final Pair<Integer, int[]> _v; - - protected RightMatrixVectorMultTask(List<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, int rl, int ru, - Pair<Integer, int[]> v) { - _groups = groups; - _vect = vect; - _ret = ret; - _rl = rl; - _ru = ru; - _v = v; - } - - @Override - public Long call() { - try { - rightMultByVector(_groups, _vect, _ret, _rl, _ru, _v); - return _ret.recomputeNonZeros(_rl, _ru - 1, 0, 0); - } - catch(Exception e) { - LOG.error(e); - throw new DMLRuntimeException(e); - } - } - } + private static final Log LOG = LogFactory.getLog(LibRightMultBy.class.getName()); + + /** + * Right multiply by matrix. Meaning a left hand side compressed matrix is multiplied with a right hand side + * uncompressed matrix. + * + * @param colGroups All Column groups in the compression + * @param that The right hand side matrix + * @param ret The MatrixBlock to return. + * @param k The parallelization degree to use. + * @param v The Precalculated counts and Maximum number of tuple entries in the column groups. + * @param allowOverlap Allow the multiplication to return an overlapped matrix. + * @return The Result Matrix, modified from the ret parameter. + */ + public static MatrixBlock rightMultByMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k, + Pair<Integer, int[]> v, boolean allowOverlap) { + + boolean containsUncompressable = false; + int distinctCount = 0; + for(ColGroup g : colGroups) { + if(g instanceof ColGroupValue) { + distinctCount += ((ColGroupValue) g).getNumValues(); + } + else { + containsUncompressable = true; + } + } + int rl = colGroups.get(0).getNumRows(); + int cl = that.getNumColumns(); + if(!allowOverlap || (containsUncompressable || distinctCount >= rl / 2)) { + if(ret == null) + ret = new MatrixBlock(rl, cl, false, rl * cl); + else if(!(ret.getNumColumns() == cl && ret.getNumRows() == rl && ret.isAllocated())) + ret.reset(rl, cl, false, rl * cl); + ret.allocateDenseBlock(); + if(that.isInSparseFormat()) { + ret = rightMultBySparseMatrix(colGroups, that, ret, k, v); + } + else { + ret = rightMultByDenseMatrix(colGroups, that, ret, k, v); + + } + ret.setNonZeros(ret.getNumColumns() * ret.getNumRows()); + } + else { + // Create an overlapping compressed Matrix Block. + ret = new CompressedMatrixBlock(true); + + ret.setNumColumns(cl); + ret.setNumRows(rl); + CompressedMatrixBlock retC = (CompressedMatrixBlock) ret; + retC.setOverlapping(true); + if(that.isInSparseFormat()) { + ret = rightMultBySparseMatrixCompressed(colGroups, that, retC, k, v); + } + else { + ret = rightMultByDenseMatrixCompressed(colGroups, that, retC, k, v); + } + } + + return ret; + + } + + /** + * Multi-threaded version of rightMultByVector. + * + * @param colGroups The Column groups used int the multiplication + * @param vector matrix block vector to multiply with + * @param result matrix block result to modify in the multiplication + * @param k number of threads to use + * @param v The Precalculated counts and Maximum number of tuple entries in the column groups + */ + public static void rightMultByVector(List<ColGroup> colGroups, MatrixBlock vector, MatrixBlock result, int k, + Pair<Integer, int[]> v) { + // initialize and allocate the result + result.allocateDenseBlock(); + if(k <= 1) { + rightMultByVector(colGroups, vector, result, v); + return; + } + + // multi-threaded execution of all groups + try { + // ColGroupUncompressed uc = getUncompressedColGroup(); + + // compute uncompressed column group in parallel + // if(uc != null) + // uc.rightMultByVector(vector, result, k); + + // compute remaining compressed column groups in parallel + // note: OLE needs alignment to segment size, otherwise wrong entry + ExecutorService pool = CommonThreadPool.get(k); + int rlen = colGroups.get(0).getNumRows(); + int seqsz = CompressionSettings.BITMAP_BLOCK_SZ; + int blklen = (int) (Math.ceil((double) rlen / k)); + blklen += (blklen % seqsz != 0) ? seqsz - blklen % seqsz : 0; + + ArrayList<RightMatrixVectorMultTask> tasks = new ArrayList<>(); + for(int i = 0; i < k & i * blklen < rlen; i++) { + tasks.add(new RightMatrixVectorMultTask(colGroups, vector, result, i * blklen, + Math.min((i + 1) * blklen, rlen), v)); + } + + List<Future<Long>> ret = pool.invokeAll(tasks); + pool.shutdown(); + + // error handling and nnz aggregation + long lnnz = 0; + for(Future<Long> tmp : ret) + lnnz += tmp.get(); + result.setNonZeros(lnnz); + } + catch(InterruptedException | ExecutionException e) { + throw new DMLRuntimeException(e); + } + } + + /** + * Multiply this matrix block by a column vector on the right. + * + * @param vector right-hand operand of the multiplication + * @param result buffer to hold the result; must have the appropriate size already + * @param v The Precalculated counts and Maximum number of tuple entries in the column groups. + */ + private static void rightMultByVector(List<ColGroup> colGroups, MatrixBlock vector, MatrixBlock result, + Pair<Integer, int[]> v) { + + // delegate matrix-vector operation to each column group + rightMultByVector(colGroups, vector, result, 0, result.getNumRows(), v); + + // post-processing + result.recomputeNonZeros(); + } + + private static MatrixBlock rightMultBySparseMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, + int k, Pair<Integer, int[]> v) { + SparseBlock sb = that.getSparseBlock(); + double[] retV = ret.getDenseBlockValues(); + + if(sb == null) + throw new DMLRuntimeException("Invalid Right Mult by Sparse matrix, input matrix was dense"); + + for(ColGroup grp : colGroups) { + if(grp instanceof ColGroupUncompressed) + ((ColGroupUncompressed) grp).rightMultByMatrix(that, ret, 0, ret.getNumColumns()); + } + + // Pair<Integer, int[]> v = Util.getMaxNumValues(colGroups); + // if(k == 1) { + for(int j = 0; j < colGroups.size(); j++) { + double[] preAggregatedB = ((ColGroupValue) colGroups.get(j)).preaggValues(v.getRight()[j], + sb, + colGroups.get(j).getValues(), + 0, + that.getNumColumns(), + that.getNumColumns()); + colGroups.get(j).rightMultByMatrix(preAggregatedB, + retV, + that.getNumColumns(), + 0, + ret.getNumRows(), + 0, + that.getNumColumns()); + + } + // } + // else { + // ExecutorService pool = CommonThreadPool.get(k); + // ArrayList<RightMultBySparseMatrixTask> tasks = new ArrayList<>(); + // try { + + // for(int j = 0; j < ret.getNumColumns(); j += CompressionSettings.BITMAP_BLOCK_SZ) { + // tasks.add(new RightMultBySparseMatrixTask(colGroups, retV, sb, materialized, v, numColumns, j, + // Math.min(j + CompressionSettings.BITMAP_BLOCK_SZ, ret.getNumColumns()))); + // } + + // List<Future<Object>> futures = pool.invokeAll(tasks); + // pool.shutdown(); + // for(Future<Object> future : futures) + // future.get(); + // } + // catch(InterruptedException | ExecutionException e) { + // throw new DMLRuntimeException(e); + // } + // } + + return ret; + } + + private static MatrixBlock rightMultByDenseMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, + int k, Pair<Integer, int[]> v) { + + // long StartTime = System.currentTimeMillis(); + DenseBlock db = that.getDenseBlock(); + double[] retV = ret.getDenseBlockValues(); + double[] thatV; + + for(ColGroup grp : colGroups) { + if(grp instanceof ColGroupUncompressed) { + ((ColGroupUncompressed) grp).rightMultByMatrix(that, ret, 0, ret.getNumRows()); + } + } + + if(k == 1) { + ColGroupValue.setupThreadLocalMemory((v.getLeft())); + for(int b = 0; b < db.numBlocks(); b++) { + // int blockSize = db.blockSize(b); + thatV = db.valuesAt(b); + for(int j = 0; j < colGroups.size(); j++) { + int colBlockSize = 128; + for(int i = 0; i < that.getNumColumns(); i += colBlockSize) { + if(colGroups.get(j) instanceof ColGroupValue) { + double[] preAggregatedB = ((ColGroupValue) colGroups.get(j)).preaggValues(v.getRight()[j], + thatV, + colGroups.get(j).getValues(), + i, + Math.min(i + colBlockSize, that.getNumColumns()), + that.getNumColumns()); + int blklenRows = CompressionSettings.BITMAP_BLOCK_SZ; + for(int n = 0; n * blklenRows < ret.getNumRows(); n++) { + colGroups.get(j).rightMultByMatrix(preAggregatedB, + retV, + that.getNumColumns(), + n * blklenRows, + Math.min((n + 1) * blklenRows, ret.getNumRows()), + i, + Math.min(i + colBlockSize, that.getNumColumns())); + } + } + } + } + } + ColGroupValue.cleanupThreadLocalMemory(); + } + else { + + thatV = db.valuesAt(0); + ExecutorService pool = CommonThreadPool.get(k); + ArrayList<RightMatrixMultTask> tasks = new ArrayList<>(); + ArrayList<RightMatrixPreAggregateTask> preTask = new ArrayList<>(colGroups.size()); + // Pair<Integer, int[]> v; + final int blkz = CompressionSettings.BITMAP_BLOCK_SZ; + int blklenRows = (int) (Math.ceil((double) ret.getNumRows() / (2 * k))); + + try { + List<Future<double[]>> ag = pool.invokeAll(preAggregate(colGroups, thatV, that, preTask, v)); + // DDC and RLE + for(int j = 0; j * blklenRows < ret.getNumRows(); j++) { + RightMatrixMultTask rmmt = new RightMatrixMultTask(colGroups, retV, ag, v, that.getNumColumns(), + j * blklenRows, Math.min((j + 1) * blklenRows, ret.getNumRows()), 0, that.getNumColumns(), + false, false); + tasks.add(rmmt); + } + blklenRows += (blklenRows % blkz != 0) ? blkz - blklenRows % blkz : 0; + // OLE! + for(int j = 0; j * blklenRows < ret.getNumRows(); j++) { + RightMatrixMultTask rmmt = new RightMatrixMultTask(colGroups, retV, ag, v, that.getNumColumns(), + j * blklenRows, Math.min((j + 1) * blklenRows, ret.getNumRows()), 0, that.getNumColumns(), + false, true); + tasks.add(rmmt); + } + for(Future<Object> future : pool.invokeAll(tasks)) + future.get(); + tasks.clear(); + + } + catch(InterruptedException | ExecutionException e) { + throw new DMLRuntimeException(e); + } + } + + return ret; + } + + private static MatrixBlock rightMultByDenseMatrixCompressed(List<ColGroup> colGroups, MatrixBlock that, + CompressedMatrixBlock ret, int k, Pair<Integer, int[]> v) { + + DenseBlock db = that.getDenseBlock(); + double[] thatV; + + for(ColGroup grp : colGroups) { + if(grp instanceof ColGroupUncompressed) { + throw new DMLCompressionException( + "Right Mult by dense with compressed output is not efficient to do with uncompressed Compressed ColGroups and therefore not supported."); + } + } + + thatV = db.valuesAt(0); + List<ColGroup> retCg = new ArrayList<>(); + int[] newColIndexes = new int[that.getNumColumns()]; + for(int i = 0; i < that.getNumColumns(); i++) { + newColIndexes[i] = i; + } + if(k == 1) { + for(int j = 0; j < colGroups.size(); j++) { + ColGroupValue g = (ColGroupValue) colGroups.get(j); + double[] preAggregatedB = g.preaggValues(v.getRight()[j], + thatV, + g.getValues(), + 0, + that.getNumColumns(), + that.getNumColumns(), + new double[v.getRight()[j] * that.getNumColumns()]); + retCg.add(g.copyAndSet(newColIndexes, preAggregatedB)); + } + } + else { + thatV = db.valuesAt(0); + ExecutorService pool = CommonThreadPool.get(k); + ArrayList<RightMatrixPreAggregateTask> preTask = new ArrayList<>(colGroups.size()); + + try { + List<Future<double[]>> ag = pool.invokeAll(preAggregate(colGroups, thatV, that, preTask, v)); + for(int j = 0; j < colGroups.size(); j++) { + retCg.add(((ColGroupValue) colGroups.get(j)).copyAndSet(newColIndexes, ag.get(j).get())); + } + } + catch(InterruptedException | ExecutionException e) { + throw new DMLRuntimeException(e); + } + } + ret.allocateColGroupList(retCg); + ret.setOverlapping(true); + ret.setNonZeros(-1); + + return ret; + } + + private static MatrixBlock rightMultBySparseMatrixCompressed(List<ColGroup> colGroups, MatrixBlock that, + CompressedMatrixBlock ret, int k, Pair<Integer, int[]> v) { + + // long StartTime = System.currentTimeMillis(); + SparseBlock sb = that.getSparseBlock(); + + for(ColGroup grp : colGroups) { + if(grp instanceof ColGroupUncompressed) { + throw new DMLCompressionException( + "Right Mult by dense with compressed output is not efficient to do with uncompressed Compressed ColGroups and therefore not supported."); + } + } + + List<ColGroup> retCg = new ArrayList<>(); + int[] newColIndexes = new int[that.getNumColumns()]; + for(int i = 0; i < that.getNumColumns(); i++) { + newColIndexes[i] = i; + } + if(k == 1) { + for(int j = 0; j < colGroups.size(); j++) { + ColGroupValue g = (ColGroupValue) colGroups.get(j); + double[] preAggregatedB = g.preaggValues(v.getRight()[j], + sb, + colGroups.get(j).getValues(), + 0, + that.getNumColumns(), + that.getNumColumns(), + new double[v.getRight()[j] * that.getNumColumns()]); + retCg.add(g.copyAndSet(newColIndexes, preAggregatedB)); + } + } + else { + ExecutorService pool = CommonThreadPool.get(k); + ArrayList<RightMatrixPreAggregateSparseTask> preTask = new ArrayList<>(colGroups.size()); + + try { + List<Future<double[]>> ag = pool.invokeAll(preAggregate(colGroups, sb, that, preTask, v)); + for(int j = 0; j < colGroups.size(); j++) { + retCg.add(((ColGroupValue) colGroups.get(j)).copyAndSet(newColIndexes, ag.get(j).get())); + } + } + catch(InterruptedException | ExecutionException e) { + throw new DMLRuntimeException(e); + } + } + ret.allocateColGroupList(retCg); + ret.setOverlapping(true); + ret.setNonZeros(-1); + + return ret; + } + + private static ArrayList<RightMatrixPreAggregateTask> preAggregate(List<ColGroup> colGroups, double[] thatV, + MatrixBlock that, ArrayList<RightMatrixPreAggregateTask> preTask, Pair<Integer, int[]> v) { + preTask.clear(); + for(int h = 0; h < colGroups.size(); h++) { + RightMatrixPreAggregateTask pAggT = new RightMatrixPreAggregateTask((ColGroupValue) colGroups.get(h), + v.getRight()[h], thatV, colGroups.get(h).getValues(), 0, that.getNumColumns(), that.getNumColumns()); + preTask.add(pAggT); + } + return preTask; + } + + private static ArrayList<RightMatrixPreAggregateSparseTask> preAggregate(List<ColGroup> colGroups, SparseBlock sb, + MatrixBlock that, ArrayList<RightMatrixPreAggregateSparseTask> preTask, Pair<Integer, int[]> v) { + preTask.clear(); + for(int h = 0; h < colGroups.size(); h++) { + RightMatrixPreAggregateSparseTask pAggT = new RightMatrixPreAggregateSparseTask( + (ColGroupValue) colGroups.get(h), v.getRight()[h], sb, colGroups.get(h).getValues(), 0, + that.getNumColumns(), that.getNumColumns()); + preTask.add(pAggT); + } + return preTask; + } + + private static void rightMultByVector(List<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, int rl, int ru, + Pair<Integer, int[]> v) { + // + 1 to enable containing a single 0 value in the dictionary that was not materialized. + // This is to handle the case of a DDC dictionary not materializing the zero values. + // A fine tradeoff! + ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1); + + // boolean cacheDDC1 = ru - rl > CompressionSettings.BITMAP_BLOCK_SZ * 2; + + // process uncompressed column group (overwrites output) + // if(inclUC) { + for(ColGroup grp : groups) { + if(grp instanceof ColGroupUncompressed) + ((ColGroupUncompressed) grp).rightMultByVector(vect, ret, rl, ru); + } + + // process cache-conscious DDC1 groups (adds to output) + + // if(cacheDDC1) { + // ArrayList<ColGroupDDC1> tmp = new ArrayList<>(); + // for(ColGroup grp : groups) + // if(grp instanceof ColGroupDDC1) + // tmp.add((ColGroupDDC1) grp); + // if(!tmp.isEmpty()) + // ColGroupDDC1.rightMultByVector(tmp.toArray(new ColGroupDDC1[0]), vect, ret, rl, ru); + // } + // process remaining groups (adds to output) + double[] values = ret.getDenseBlockValues(); + for(ColGroup grp : groups) { + if(!(grp instanceof ColGroupUncompressed)) { + grp.rightMultByVector(vect.getDenseBlockValues(), values, rl, ru, grp.getValues()); + } + } + + ColGroupValue.cleanupThreadLocalMemory(); + + } + + private static class RightMatrixMultTask implements Callable<Object> { + private final List<ColGroup> _colGroups; + // private final double[] _thatV; + private final double[] _retV; + private final List<Future<double[]>> _aggB; + private final Pair<Integer, int[]> _v; + private final int _numColumns; + + private final int _rl; + private final int _ru; + private final int _cl; + private final int _cu; + private final boolean _mem; + private final boolean _skipOle; + + protected RightMatrixMultTask(List<ColGroup> groups, double[] retV, List<Future<double[]>> aggB, + Pair<Integer, int[]> v, int numColumns, int rl, int ru, int cl, int cu, boolean mem, boolean skipOle) { + _colGroups = groups; + // _thatV = thatV; + _retV = retV; + _aggB = aggB; + _v = v; + _numColumns = numColumns; + _rl = rl; + _ru = ru; + _cl = cl; + _cu = cu; + _mem = mem; + _skipOle = skipOle; + } + + @Override + public Object call() { + try { + if(_mem) + ColGroupValue.setupThreadLocalMemory((_v.getLeft())); + for(int j = 0; j < _colGroups.size(); j++) { + if(_colGroups.get(j) instanceof ColGroupOLE) { + if(_skipOle) { + _colGroups.get(j) + .rightMultByMatrix(_aggB.get(j).get(), _retV, _numColumns, _rl, _ru, _cl, _cu); + } + } + else { + if(!_skipOle) { + _colGroups.get(j) + .rightMultByMatrix(_aggB.get(j).get(), _retV, _numColumns, _rl, _ru, _cl, _cu); + } + } + } + if(_mem) + ColGroupValue.cleanupThreadLocalMemory(); + return null; + } + catch(Exception e) { + LOG.error(e); + throw new DMLRuntimeException(e); + } + } + } + + private static class RightMatrixPreAggregateTask implements Callable<double[]> { + private final ColGroupValue _colGroup; + private final int _numVals; + private final double[] _b; + private final double[] _dict; + + private final int _cl; + private final int _cu; + private final int _cut; + + protected RightMatrixPreAggregateTask(ColGroupValue colGroup, int numVals, double[] b, double[] dict, int cl, + int cu, int cut) { + _colGroup = colGroup; + _numVals = numVals; + _b = b; + _dict = dict; + _cl = cl; + _cu = cu; + _cut = cut; + } + + @Override + public double[] call() { + try { + return _colGroup.preaggValues(_numVals, _b, _dict, _cl, _cu, _cut); + } + catch(Exception e) { + LOG.error(e); + throw new DMLRuntimeException(e); + } + } + } + + private static class RightMatrixPreAggregateSparseTask implements Callable<double[]> { + private final ColGroupValue _colGroup; + private final int _numVals; + private final SparseBlock _b; + private final double[] _dict; + + private final int _cl; + private final int _cu; + private final int _cut; + + protected RightMatrixPreAggregateSparseTask(ColGroupValue colGroup, int numVals, SparseBlock b, double[] dict, + int cl, int cu, int cut) { + _colGroup = colGroup; + _numVals = numVals; + _b = b; + _dict = dict; + _cl = cl; + _cu = cu; + _cut = cut; + } + + @Override + public double[] call() { + try { + return _colGroup.preaggValues(_numVals, _b, _dict, _cl, _cu, _cut); + } + catch(Exception e) { + LOG.error(e); + throw new DMLRuntimeException(e); + } + } + } + + private static class RightMatrixVectorMultTask implements Callable<Long> { + private final List<ColGroup> _groups; + private final MatrixBlock _vect; + private final MatrixBlock _ret; + private final int _rl; + private final int _ru; + private final Pair<Integer, int[]> _v; + + protected RightMatrixVectorMultTask(List<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, int rl, int ru, + Pair<Integer, int[]> v) { + _groups = groups; + _vect = vect; + _ret = ret; + _rl = rl; + _ru = ru; + _v = v; + } + + @Override + public Long call() { + try { + rightMultByVector(_groups, _vect, _ret, _rl, _ru, _v); + return _ret.recomputeNonZeros(_rl, _ru - 1, 0, 0); + } + catch(Exception e) { + LOG.error(e); + throw new DMLRuntimeException(e); + } + } + } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java b/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java index f555513..4f9020a 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java @@ -47,148 +47,148 @@ import org.apache.sysds.runtime.util.CommonThreadPool; public class LibScalar { - // private static final Log LOG = LogFactory.getLog(LibScalar.class.getName()); - private static final int MINIMUM_PARALLEL_SIZE = 8096; - - public static MatrixBlock scalarOperations(ScalarOperator sop, CompressedMatrixBlock m1, CompressedMatrixBlock ret, - boolean overlapping) { - // LOG.error(sop); - if(sop instanceof LeftScalarOperator) { - if(sop.fn instanceof Minus) { - m1 = (CompressedMatrixBlock) scalarOperations(new RightScalarOperator(Multiply.getMultiplyFnObject(), - -1), m1, ret, overlapping); - return scalarOperations(new RightScalarOperator(Plus.getPlusFnObject(), sop.getConstant()), - m1, - ret, - overlapping); - } - else if(sop.fn instanceof Power2) { - throw new DMLCompressionException("Left Power does not make sense."); - // List<ColGroup> newColGroups = new ArrayList<>(); - // double v = sop.executeScalar(0); - - // double[] values = new double[m1.getNumColumns()]; - // Arrays.fill(values, v); - - // int[] colIndexes = new int[m1.getNumColumns()]; - // for(int i = 0; i < colIndexes.length; i++) { - // colIndexes[i] = i; - // } - // newColGroups.add(new ColGroupConst(colIndexes, ret.getNumRows(), new Dictionary(values))); - // ret.allocateColGroupList(newColGroups); - // ret.setNonZeros(ret.getNumColumns() * ret.getNumRows()); - // return ret; - } - - } - - List<ColGroup> colGroups = m1.getColGroups(); - if(overlapping && !(sop.fn instanceof Multiply)) { - if(sop.fn instanceof Plus || sop.fn instanceof Minus) { - - // If the colGroup is overlapping we know there are no incompressable colGroups. - List<ColGroup> newColGroups = new ArrayList<>(); - for(ColGroup grp : colGroups) { - ColGroupValue g = (ColGroupValue) grp; - newColGroups.add(g.copy()); - } - int[] colIndexes = newColGroups.get(0).getColIndices(); - double v = sop.executeScalar(0); - double[] values = new double[colIndexes.length]; - Arrays.fill(values, v); - newColGroups.add(new ColGroupConst(colIndexes, ret.getNumRows(), new Dictionary(values))); - ret.allocateColGroupList(newColGroups); - ret.setOverlapping(true); - ret.setNonZeros(-1); - } - } - else { - - if(sop.getNumThreads() > 1) { - parallelScalarOperations(sop, colGroups, ret, sop.getNumThreads()); - } - else { - // Apply the operation to each of the column groups. - // Most implementations will only modify metadata. - List<ColGroup> newColGroups = new ArrayList<>(); - for(ColGroup grp : colGroups) { - newColGroups.add(grp.scalarOperation(sop)); - } - ret.allocateColGroupList(newColGroups); - } - ret.setNonZeros(-1); - ret.setOverlapping(m1.isOverlapping()); - } - - return ret; - - } - - private static void parallelScalarOperations(ScalarOperator sop, List<ColGroup> colGroups, - CompressedMatrixBlock ret, int k) { - ExecutorService pool = CommonThreadPool.get(k); - List<ScalarTask> tasks = partition(sop, colGroups); - try { - List<Future<List<ColGroup>>> rtasks = pool.invokeAll(tasks); - pool.shutdown(); - List<ColGroup> newColGroups = new ArrayList<>(); - for(Future<List<ColGroup>> f : rtasks) { - newColGroups.addAll(f.get()); - } - ret.allocateColGroupList(newColGroups); - } - catch(InterruptedException | ExecutionException e) { - throw new DMLRuntimeException(e); - } - } - - private static List<ScalarTask> partition(ScalarOperator sop, List<ColGroup> colGroups) { - ArrayList<ScalarTask> tasks = new ArrayList<>(); - ArrayList<ColGroup> small = new ArrayList<>(); - for(ColGroup grp : colGroups) { - if(grp instanceof ColGroupUncompressed) { - ArrayList<ColGroup> uc = new ArrayList<>(); - uc.add(grp); - tasks.add(new ScalarTask(uc, sop)); - } - else { - int nv = ((ColGroupValue) grp).getNumValues() * grp.getColIndices().length; - if(nv < MINIMUM_PARALLEL_SIZE) { - small.add(grp); - } - else { - ArrayList<ColGroup> large = new ArrayList<>(); - large.add(grp); - tasks.add(new ScalarTask(large, sop)); - } - } - if(small.size() > 10) { - tasks.add(new ScalarTask(small, sop)); - small = new ArrayList<>(); - } - } - if(small.size() > 0) { - tasks.add(new ScalarTask(small, sop)); - } - return tasks; - } - - private static class ScalarTask implements Callable<List<ColGroup>> { - private final List<ColGroup> _colGroups; - private final ScalarOperator _sop; - - protected ScalarTask(List<ColGroup> colGroups, ScalarOperator sop) { - _colGroups = colGroups; - _sop = sop; - } - - @Override - public List<ColGroup> call() { - List<ColGroup> res = new ArrayList<>(); - for(ColGroup x : _colGroups) { - res.add(x.scalarOperation(_sop)); - } - return res; - } - } + // private static final Log LOG = LogFactory.getLog(LibScalar.class.getName()); + private static final int MINIMUM_PARALLEL_SIZE = 8096; + + public static MatrixBlock scalarOperations(ScalarOperator sop, CompressedMatrixBlock m1, + CompressedMatrixBlock ret, boolean overlapping) + { + if(sop instanceof LeftScalarOperator) { + if(sop.fn instanceof Minus) { + m1 = (CompressedMatrixBlock) scalarOperations(new RightScalarOperator(Multiply.getMultiplyFnObject(), + -1), m1, ret, overlapping); + return scalarOperations(new RightScalarOperator(Plus.getPlusFnObject(), sop.getConstant()), + m1, + ret, + overlapping); + } + else if(sop.fn instanceof Power2) { + throw new DMLCompressionException("Left Power does not make sense."); + // List<ColGroup> newColGroups = new ArrayList<>(); + // double v = sop.executeScalar(0); + + // double[] values = new double[m1.getNumColumns()]; + // Arrays.fill(values, v); + + // int[] colIndexes = new int[m1.getNumColumns()]; + // for(int i = 0; i < colIndexes.length; i++) { + // colIndexes[i] = i; + // } + // newColGroups.add(new ColGroupConst(colIndexes, ret.getNumRows(), new Dictionary(values))); + // ret.allocateColGroupList(newColGroups); + // ret.setNonZeros(ret.getNumColumns() * ret.getNumRows()); + // return ret; + } + + } + + List<ColGroup> colGroups = m1.getColGroups(); + if(overlapping && !(sop.fn instanceof Multiply)) { + if(sop.fn instanceof Plus || sop.fn instanceof Minus) { + + // If the colGroup is overlapping we know there are no incompressable colGroups. + List<ColGroup> newColGroups = new ArrayList<>(); + for(ColGroup grp : colGroups) { + ColGroupValue g = (ColGroupValue) grp; + newColGroups.add(g.copy()); + } + int[] colIndexes = newColGroups.get(0).getColIndices(); + double v = sop.executeScalar(0); + double[] values = new double[colIndexes.length]; + Arrays.fill(values, v); + newColGroups.add(new ColGroupConst(colIndexes, ret.getNumRows(), new Dictionary(values))); + ret.allocateColGroupList(newColGroups); + ret.setOverlapping(true); + ret.setNonZeros(-1); + } + } + else { + + if(sop.getNumThreads() > 1) { + parallelScalarOperations(sop, colGroups, ret, sop.getNumThreads()); + } + else { + // Apply the operation to each of the column groups. + // Most implementations will only modify metadata. + List<ColGroup> newColGroups = new ArrayList<>(); + for(ColGroup grp : colGroups) { + newColGroups.add(grp.scalarOperation(sop)); + } + ret.allocateColGroupList(newColGroups); + } + ret.setNonZeros(-1); + ret.setOverlapping(m1.isOverlapping()); + } + + return ret; + + } + + private static void parallelScalarOperations(ScalarOperator sop, List<ColGroup> colGroups, + CompressedMatrixBlock ret, int k) { + ExecutorService pool = CommonThreadPool.get(k); + List<ScalarTask> tasks = partition(sop, colGroups); + try { + List<Future<List<ColGroup>>> rtasks = pool.invokeAll(tasks); + pool.shutdown(); + List<ColGroup> newColGroups = new ArrayList<>(); + for(Future<List<ColGroup>> f : rtasks) { + newColGroups.addAll(f.get()); + } + ret.allocateColGroupList(newColGroups); + } + catch(InterruptedException | ExecutionException e) { + throw new DMLRuntimeException(e); + } + } + + private static List<ScalarTask> partition(ScalarOperator sop, List<ColGroup> colGroups) { + ArrayList<ScalarTask> tasks = new ArrayList<>(); + ArrayList<ColGroup> small = new ArrayList<>(); + for(ColGroup grp : colGroups) { + if(grp instanceof ColGroupUncompressed) { + ArrayList<ColGroup> uc = new ArrayList<>(); + uc.add(grp); + tasks.add(new ScalarTask(uc, sop)); + } + else { + int nv = ((ColGroupValue) grp).getNumValues() * grp.getColIndices().length; + if(nv < MINIMUM_PARALLEL_SIZE) { + small.add(grp); + } + else { + ArrayList<ColGroup> large = new ArrayList<>(); + large.add(grp); + tasks.add(new ScalarTask(large, sop)); + } + } + if(small.size() > 10) { + tasks.add(new ScalarTask(small, sop)); + small = new ArrayList<>(); + } + } + if(small.size() > 0) { + tasks.add(new ScalarTask(small, sop)); + } + return tasks; + } + + private static class ScalarTask implements Callable<List<ColGroup>> { + private final List<ColGroup> _colGroups; + private final ScalarOperator _sop; + + protected ScalarTask(List<ColGroup> colGroups, ScalarOperator sop) { + _colGroups = colGroups; + _sop = sop; + } + + @Override + public List<ColGroup> call() { + List<ColGroup> res = new ArrayList<>(); + for(ColGroup x : _colGroups) { + res.add(x.scalarOperation(_sop)); + } + return res; + } + } } diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java index 8fa0698..80418ac 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java @@ -52,6 +52,7 @@ import java.util.stream.Collectors; import static org.apache.sysds.runtime.util.ProgramConverter.*; public class FederatedPSControlThread extends PSWorker implements Callable<Void> { + private static final long serialVersionUID = 6846648059569648791L; FederatedData _featuresData; FederatedData _labelsData; final long _batchCounterVarID; @@ -140,6 +141,7 @@ public class FederatedPSControlThread extends PSWorker implements Callable<Void> * Setup UDF executed on the federated worker */ private static class setupFederatedWorker extends FederatedUDF { + private static final long serialVersionUID = -3148991224792675607L; long _batchSize; long _dataSize; long _numBatches; @@ -209,6 +211,8 @@ public class FederatedPSControlThread extends PSWorker implements Callable<Void> * Teardown UDF executed on the federated worker */ private static class teardownFederatedWorker extends FederatedUDF { + private static final long serialVersionUID = -153650281873318969L; + protected teardownFederatedWorker() { super(new long[]{}); } @@ -326,6 +330,8 @@ public class FederatedPSControlThread extends PSWorker implements Callable<Void> * This is the code that will be executed on the federated Worker when computing a single batch */ private static class federatedComputeBatchGradients extends FederatedUDF { + private static final long serialVersionUID = -3652112393963053475L; + protected federatedComputeBatchGradients(long[] inIDs) { super(inIDs); } @@ -438,6 +444,8 @@ public class FederatedPSControlThread extends PSWorker implements Callable<Void> * This is the code that will be executed on the federated Worker when computing one epoch */ private static class federatedComputeEpochGradients extends FederatedUDF { + private static final long serialVersionUID = -3075901536748794832L; + protected federatedComputeEpochGradients(long[] inIDs) { super(inIDs); } diff --git a/src/main/java/org/apache/sysds/runtime/privacy/finegrained/FineGrainedPrivacyList.java b/src/main/java/org/apache/sysds/runtime/privacy/finegrained/FineGrainedPrivacyList.java index 54cb639..2fb1297 100644 --- a/src/main/java/org/apache/sysds/runtime/privacy/finegrained/FineGrainedPrivacyList.java +++ b/src/main/java/org/apache/sysds/runtime/privacy/finegrained/FineGrainedPrivacyList.java @@ -165,21 +165,13 @@ public class FineGrainedPrivacyList implements FineGrainedPrivacy { } private boolean listEquals(ArrayList<Map.Entry<DataRange,PrivacyLevel>> otherFGP){ - if ( otherFGP.size() == constraintCollection.size() ){ - for ( Map.Entry<DataRange, PrivacyLevel> constraint : constraintCollection){ - if ( !innerEquals(constraint, otherFGP) ) - return false; - } - return true; - } else return false; - } - - private boolean innerEquals(Map.Entry<DataRange, PrivacyLevel> constraint, ArrayList<Map.Entry<DataRange,PrivacyLevel>> otherFGP){ - for (Map.Entry<DataRange, PrivacyLevel> otherConstraint : otherFGP){ - if ( constraint.equals(otherConstraint) ) - return true; + if ( otherFGP.size() != constraintCollection.size() ) + return false; + for ( Map.Entry<DataRange, PrivacyLevel> constraint : constraintCollection){ + if ( !otherFGP.contains(constraint) ) + return false; } - return false; + return true; } @Override diff --git a/src/main/java/org/apache/sysds/runtime/privacy/propagation/PrivacyPropagator.java b/src/main/java/org/apache/sysds/runtime/privacy/propagation/PrivacyPropagator.java index 2776a49..49d17fa 100644 --- a/src/main/java/org/apache/sysds/runtime/privacy/propagation/PrivacyPropagator.java +++ b/src/main/java/org/apache/sysds/runtime/privacy/propagation/PrivacyPropagator.java @@ -22,7 +22,6 @@ package org.apache.sysds.runtime.privacy.propagation; import java.util.*; import org.apache.sysds.parser.DataExpression; -import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; import org.apache.sysds.runtime.instructions.Instruction; import org.apache.sysds.runtime.instructions.cp.*; diff --git a/src/test/java/org/apache/sysds/test/component/paramserv/SerializationTest.java b/src/test/java/org/apache/sysds/test/component/paramserv/SerializationTest.java index bf47f19..0fd172d 100644 --- a/src/test/java/org/apache/sysds/test/component/paramserv/SerializationTest.java +++ b/src/test/java/org/apache/sysds/test/component/paramserv/SerializationTest.java @@ -27,7 +27,6 @@ import java.io.ObjectInputStream; import java.util.Arrays; import java.util.Collection; -import org.apache.sysds.runtime.DMLRuntimeException; import org.junit.Assert; import org.junit.Test; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; @@ -44,11 +43,8 @@ public class SerializationTest { private int _named; @Parameterized.Parameters - public static Collection named() { - return Arrays.asList(new Object[][] { - { 0 }, - { 1 } - }); + public static Collection<?> named() { + return Arrays.asList(new Object[][] {{ 0 }, { 1 }}); } public SerializationTest(Integer named) { diff --git a/src/test/java/org/apache/sysds/test/functions/privacy/ReadWriteTest.java b/src/test/java/org/apache/sysds/test/functions/privacy/ReadWriteTest.java index 6b0f941..dea5773 100644 --- a/src/test/java/org/apache/sysds/test/functions/privacy/ReadWriteTest.java +++ b/src/test/java/org/apache/sysds/test/functions/privacy/ReadWriteTest.java @@ -133,7 +133,7 @@ public class ReadWriteTest extends AutomatedTestBase { return a; } - private void setFineGrained(PrivacyConstraint privacyConstraint){ + private static void setFineGrained(PrivacyConstraint privacyConstraint){ FineGrainedPrivacy fgp = privacyConstraint.getFineGrainedPrivacy(); fgp.put(new DataRange(new long[]{1,2}, new long[]{5,4}), PrivacyLevel.Private); fgp.put(new DataRange(new long[]{7,1}, new long[]{9,1}), PrivacyLevel.Private); diff --git a/src/test/java/org/apache/sysds/test/functions/privacy/propagation/AppendPropagatorTest.java b/src/test/java/org/apache/sysds/test/functions/privacy/propagation/AppendPropagatorTest.java index 560db59..41edec0 100644 --- a/src/test/java/org/apache/sysds/test/functions/privacy/propagation/AppendPropagatorTest.java +++ b/src/test/java/org/apache/sysds/test/functions/privacy/propagation/AppendPropagatorTest.java @@ -19,24 +19,26 @@ package org.apache.sysds.test.functions.privacy.propagation; -import org.apache.sysds.api.DMLScript; -import org.apache.sysds.common.Types; -import org.apache.sysds.parser.DataExpression; -import org.apache.sysds.runtime.instructions.cp.*; +import org.apache.sysds.runtime.instructions.cp.Data; +import org.apache.sysds.runtime.instructions.cp.DoubleObject; +import org.apache.sysds.runtime.instructions.cp.IntObject; +import org.apache.sysds.runtime.instructions.cp.ListObject; +import org.apache.sysds.runtime.instructions.cp.ScalarObject; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.meta.MatrixCharacteristics; import org.apache.sysds.runtime.privacy.PrivacyConstraint; import org.apache.sysds.runtime.privacy.PrivacyConstraint.PrivacyLevel; -import org.apache.sysds.runtime.privacy.PrivacyUtils; import org.apache.sysds.runtime.privacy.finegrained.DataRange; -import org.apache.sysds.runtime.privacy.finegrained.FineGrainedPrivacy; -import org.apache.sysds.runtime.privacy.propagation.*; +import org.apache.sysds.runtime.privacy.propagation.AppendPropagator; +import org.apache.sysds.runtime.privacy.propagation.CBindPropagator; +import org.apache.sysds.runtime.privacy.propagation.ListAppendPropagator; +import org.apache.sysds.runtime.privacy.propagation.ListRemovePropagator; +import org.apache.sysds.runtime.privacy.propagation.Propagator; +import org.apache.sysds.runtime.privacy.propagation.PropagatorMultiReturn; +import org.apache.sysds.runtime.privacy.propagation.RBindPropagator; import org.apache.sysds.test.AutomatedTestBase; import org.apache.sysds.test.TestConfiguration; import org.apache.sysds.test.TestUtils; -import org.apache.sysds.test.functions.federated.primitives.FederatedRCBindTest; -import org.apache.wink.json4j.JSONException; -import org.apache.wink.json4j.JSONObject; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -46,8 +48,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import static org.junit.Assert.assertEquals; - public class AppendPropagatorTest extends AutomatedTestBase { private final static String TEST_DIR = "functions/privacy/"; @@ -100,23 +100,6 @@ public class AppendPropagatorTest extends AutomatedTestBase { generalOnlyRBindTest(new PrivacyConstraint(PrivacyLevel.Private), new PrivacyConstraint(PrivacyLevel.PrivateAggregation)); } - private void generalOnlyRBindTest(PrivacyConstraint constraint1, PrivacyConstraint constraint2){ - int columns = 2; - int rows1 = 4; - int rows2 = 3; - MatrixBlock inputMatrix1 = new MatrixBlock(rows1,columns,3); - MatrixBlock inputMatrix2 = new MatrixBlock(rows2,columns,4); - AppendPropagator propagator = new RBindPropagator(inputMatrix1, constraint1, inputMatrix2, constraint2); - PrivacyConstraint mergedConstraint = propagator.propagate(); - Assert.assertEquals(mergedConstraint.getPrivacyLevel(), PrivacyLevel.None); - Map<DataRange, PrivacyLevel> firstHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( - new DataRange(new long[]{0,0}, new long[]{rows1-1,columns-1})); - firstHalfPrivacy.forEach((range,level) -> Assert.assertEquals(constraint1.getPrivacyLevel(),level)); - Map<DataRange, PrivacyLevel> secondHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( - new DataRange(new long[]{rows1,0}, new long[]{rows1+rows2-1,columns-1})); - secondHalfPrivacy.forEach((range,level) -> Assert.assertEquals(constraint2.getPrivacyLevel(),level)); - } - @Test public void generalOnlyCBindPrivate1Test(){ generalOnlyCBindTest(new PrivacyConstraint(PrivacyLevel.Private), new PrivacyConstraint()); @@ -152,23 +135,6 @@ public class AppendPropagatorTest extends AutomatedTestBase { generalOnlyCBindTest(new PrivacyConstraint(PrivacyLevel.Private), new PrivacyConstraint(PrivacyLevel.PrivateAggregation)); } - private void generalOnlyCBindTest(PrivacyConstraint constraint1, PrivacyConstraint constraint2){ - int rows = 2; - int columns1 = 4; - int columns2 = 3; - MatrixBlock inputMatrix1 = new MatrixBlock(rows,columns1,3); - MatrixBlock inputMatrix2 = new MatrixBlock(rows,columns2,4); - AppendPropagator propagator = new CBindPropagator(inputMatrix1, constraint1, inputMatrix2, constraint2); - PrivacyConstraint mergedConstraint = propagator.propagate(); - Assert.assertEquals(mergedConstraint.getPrivacyLevel(), PrivacyLevel.None); - Map<DataRange, PrivacyLevel> firstHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( - new DataRange(new long[]{0,0}, new long[]{rows-1,columns1-1})); - firstHalfPrivacy.forEach((range,level) -> Assert.assertEquals(constraint1.getPrivacyLevel(),level)); - Map<DataRange, PrivacyLevel> secondHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( - new DataRange(new long[]{0,columns1}, new long[]{rows,columns1+columns2-1})); - secondHalfPrivacy.forEach((range,level) -> Assert.assertEquals(constraint2.getPrivacyLevel(),level)); - } - @Test public void generalOnlyListAppendPrivate1Test(){ generalOnlyListAppendTest(new PrivacyConstraint(PrivacyLevel.Private), new PrivacyConstraint()); @@ -204,25 +170,6 @@ public class AppendPropagatorTest extends AutomatedTestBase { generalOnlyListAppendTest(new PrivacyConstraint(PrivacyLevel.Private), new PrivacyConstraint(PrivacyLevel.PrivateAggregation)); } - private void generalOnlyListAppendTest(PrivacyConstraint constraint1, PrivacyConstraint constraint2){ - int length1 = 6; - List<Data> dataList1 = Arrays.asList(new Data[length1]); - ListObject input1 = new ListObject(dataList1); - int length2 = 11; - List<Data> dataList2 = Arrays.asList(new Data[length2]); - ListObject input2 = new ListObject(dataList2); - Propagator propagator = new ListAppendPropagator(input1, constraint1, input2, constraint2); - PrivacyConstraint mergedConstraint = propagator.propagate(); - Map<DataRange, PrivacyLevel> firstHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( - new DataRange(new long[]{0}, new long[]{length1-1}) - ); - firstHalfPrivacy.forEach((range,level) -> Assert.assertEquals(constraint1.getPrivacyLevel(),level)); - Map<DataRange, PrivacyLevel> secondHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( - new DataRange(new long[length1], new long[]{length1+length2-1}) - ); - secondHalfPrivacy.forEach((range,level) -> Assert.assertEquals(constraint2.getPrivacyLevel(),level)); - } - @Test public void generalOnlyListRemoveAppendPrivate1Test(){ generalOnlyListRemoveAppendTest(new PrivacyConstraint(PrivacyLevel.Private), new PrivacyConstraint(), @@ -265,27 +212,6 @@ public class AppendPropagatorTest extends AutomatedTestBase { PrivacyLevel.Private, PrivacyLevel.Private); } - private void generalOnlyListRemoveAppendTest( - PrivacyConstraint constraint1, PrivacyConstraint constraint2, PrivacyLevel expected1, PrivacyLevel expected2){ - int dataLength = 9; - List<Data> dataList = new ArrayList<>(); - for ( int i = 0; i < dataLength; i++){ - dataList.add(new DoubleObject(i)); - } - ListObject inputList = new ListObject(dataList); - - int removePositionInt = 5; - ScalarObject removePosition = new IntObject(removePositionInt); - - PropagatorMultiReturn propagator = new ListRemovePropagator(inputList, constraint1, removePosition, constraint2); - PrivacyConstraint[] mergedConstraints = propagator.propagate(); - - Assert.assertEquals(expected1, mergedConstraints[0].getPrivacyLevel()); - Assert.assertEquals(expected2, mergedConstraints[1].getPrivacyLevel()); - Assert.assertFalse("The first output constraint should have no fine-grained constraints", mergedConstraints[0].hasFineGrainedConstraints()); - Assert.assertFalse("The second output constraint should have no fine-grained constraints", mergedConstraints[1].hasFineGrainedConstraints()); - } - @Test public void finegrainedRBindPrivate1(){ PrivacyConstraint constraint1 = new PrivacyConstraint(); @@ -339,30 +265,7 @@ public class AppendPropagatorTest extends AutomatedTestBase { constraint2.getFineGrainedPrivacy().put(new DataRange(new long[]{1,0},new long[]{2,0}), PrivacyLevel.PrivateAggregation); finegrainedRBindTest(constraint1, constraint2); } - - private void finegrainedRBindTest(PrivacyConstraint constraint1, PrivacyConstraint constraint2){ - int columns = 2; - int rows1 = 4; - int rows2 = 3; - MatrixBlock inputMatrix1 = new MatrixBlock(rows1,columns,3); - MatrixBlock inputMatrix2 = new MatrixBlock(rows2,columns,4); - AppendPropagator propagator = new RBindPropagator(inputMatrix1, constraint1, inputMatrix2, constraint2); - PrivacyConstraint mergedConstraint = propagator.propagate(); - Assert.assertEquals(mergedConstraint.getPrivacyLevel(), PrivacyLevel.None); - Map<DataRange, PrivacyLevel> firstHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( - new DataRange(new long[]{0,0}, new long[]{rows1-1,columns-1})); - constraint1.getFineGrainedPrivacy().getAllConstraintsList().forEach( - constraint -> Assert.assertTrue("Merged constraint should contain same privacy levels as input 1", - firstHalfPrivacy.containsValue(constraint.getValue())) - ); - Map<DataRange, PrivacyLevel> secondHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( - new DataRange(new long[]{rows1,0}, new long[]{rows1+rows2-1,columns-1})); - constraint2.getFineGrainedPrivacy().getAllConstraintsList().forEach( - constraint -> Assert.assertTrue("Merged constraint should contain same privacy levels as input 2", - secondHalfPrivacy.containsValue(constraint.getValue())) - ); - } - + @Test public void finegrainedCBindPrivate1(){ PrivacyConstraint constraint1 = new PrivacyConstraint(); @@ -417,29 +320,6 @@ public class AppendPropagatorTest extends AutomatedTestBase { finegrainedCBindTest(constraint1, constraint2); } - private void finegrainedCBindTest(PrivacyConstraint constraint1, PrivacyConstraint constraint2){ - int rows = 6; - int columns1 = 4; - int columns2 = 3; - MatrixBlock inputMatrix1 = new MatrixBlock(rows,columns1,3); - MatrixBlock inputMatrix2 = new MatrixBlock(rows,columns2,4); - AppendPropagator propagator = new CBindPropagator(inputMatrix1, constraint1, inputMatrix2, constraint2); - PrivacyConstraint mergedConstraint = propagator.propagate(); - Assert.assertEquals(mergedConstraint.getPrivacyLevel(), PrivacyLevel.None); - Map<DataRange, PrivacyLevel> firstHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( - new DataRange(new long[]{0,0}, new long[]{rows-1,columns1-1})); - constraint1.getFineGrainedPrivacy().getAllConstraintsList().forEach( - constraint -> Assert.assertTrue("Merged constraint should contain same privacy levels as input 1", - firstHalfPrivacy.containsValue(constraint.getValue())) - ); - Map<DataRange, PrivacyLevel> secondHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( - new DataRange(new long[]{0,columns1}, new long[]{rows,columns1+columns2-1})); - constraint2.getFineGrainedPrivacy().getAllConstraintsList().forEach( - constraint -> Assert.assertTrue("Merged constraint should contain same privacy levels as input 2", - secondHalfPrivacy.containsValue(constraint.getValue())) - ); - } - @Test public void finegrainedListAppendPrivate1(){ PrivacyConstraint constraint1 = new PrivacyConstraint(); @@ -494,39 +374,12 @@ public class AppendPropagatorTest extends AutomatedTestBase { finegrainedListAppendTest(constraint1, constraint2); } - private void finegrainedListAppendTest(PrivacyConstraint constraint1, PrivacyConstraint constraint2){ - int length1 = 6; - List<Data> dataList1 = Arrays.asList(new Data[length1]); - ListObject input1 = new ListObject(dataList1); - int length2 = 11; - List<Data> dataList2 = Arrays.asList(new Data[length2]); - ListObject input2 = new ListObject(dataList2); - Propagator propagator = new ListAppendPropagator(input1, constraint1, input2, constraint2); - PrivacyConstraint mergedConstraint = propagator.propagate(); - Assert.assertEquals(mergedConstraint.getPrivacyLevel(), PrivacyLevel.None); - Map<DataRange, PrivacyLevel> firstHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( - new DataRange(new long[]{0}, new long[]{length1-1}) - ); - constraint1.getFineGrainedPrivacy().getAllConstraintsList().forEach( - constraint -> Assert.assertTrue("Merged constraint should contain same privacy levels as input 1", - firstHalfPrivacy.containsValue(constraint.getValue())) - ); - Map<DataRange, PrivacyLevel> secondHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( - new DataRange(new long[]{length1}, new long[]{length1+length2-1}) - ); - constraint2.getFineGrainedPrivacy().getAllConstraintsList().forEach( - constraint -> Assert.assertTrue("Merged constraint should contain same privacy levels as input 2", - secondHalfPrivacy.containsValue(constraint.getValue())) - ); - } - @Test public void testFunction(){ int dataLength = 9; List<Data> dataList = new ArrayList<>(); - for ( int i = 0; i < dataLength; i++){ + for ( int i = 0; i < dataLength; i++) dataList.add(new DoubleObject(i)); - } ListObject l = new ListObject(dataList); ListObject lCopy = l.copy(); int position = 4; @@ -591,38 +444,6 @@ public class AppendPropagatorTest extends AutomatedTestBase { finegrainedListRemoveAppendTest(constraint1, constraint2, PrivacyLevel.PrivateAggregation); } - private void finegrainedListRemoveAppendTest( - PrivacyConstraint constraint1, PrivacyConstraint constraint2, PrivacyLevel expectedOutput2){ - finegrainedListRemoveAppendTest(constraint1, constraint2, expectedOutput2, false); - } - - private void finegrainedListRemoveAppendTest( - PrivacyConstraint constraint1, PrivacyConstraint constraint2, PrivacyLevel expectedOutput2, boolean singleElementPrivacy){ - int dataLength = 9; - List<Data> dataList = new ArrayList<>(); - for ( int i = 0; i < dataLength; i++){ - dataList.add(new DoubleObject(i)); - } - ListObject inputList = new ListObject(dataList); - int removePositionInt = 5; - ScalarObject removePosition = new IntObject(removePositionInt); - PropagatorMultiReturn propagator = new ListRemovePropagator(inputList, constraint1, removePosition, constraint2); - PrivacyConstraint[] mergedConstraints = propagator.propagate(); - - if ( !singleElementPrivacy ){ - Map<DataRange, PrivacyLevel> outputPrivacy = mergedConstraints[0].getFineGrainedPrivacy().getPrivacyLevel( - new DataRange(new long[]{0}, new long[]{dataLength-1}) - ); - constraint1.getFineGrainedPrivacy().getAllConstraintsList().forEach( - constraint -> Assert.assertTrue("Merged constraint should contain same privacy levels as input 1", - outputPrivacy.containsValue(constraint.getValue())) - ); - } - - Assert.assertEquals(expectedOutput2, mergedConstraints[1].getPrivacyLevel()); - Assert.assertFalse(mergedConstraints[1].hasFineGrainedConstraints()); - } - @Test public void integrationRBindTestNoneNone(){ PrivacyConstraint pc1 = new PrivacyConstraint(PrivacyLevel.None); @@ -865,26 +686,6 @@ public class AppendPropagatorTest extends AutomatedTestBase { integrationCBindTest(constraint1, constraint2, pcExpected); } - private void integrationCBindTest(PrivacyConstraint privacyConstraint1, PrivacyConstraint privacyConstraint2, - PrivacyConstraint expectedOutput){ - TestConfiguration config = getAndLoadTestConfiguration(TEST_NAME_CBIND); - fullDMLScriptName = SCRIPT_DIR + TEST_DIR + config.getTestScript() + ".dml"; - - int cols1 = 20; - int cols2 = 30; - int rows = 10; - double[][] A = getRandomMatrix(rows, cols1, -10, 10, 0.5, 1); - double[][] B = getRandomMatrix(rows, cols2, -10, 10, 0.5, 1); - writeInputMatrixWithMTD("A", A, false, new MatrixCharacteristics(rows, cols1), privacyConstraint1); - writeInputMatrixWithMTD("B", B, false, new MatrixCharacteristics(rows, cols2), privacyConstraint2); - - programArgs = new String[]{"-nvargs", "A=" + input("A"), "B=" + input("B"), "C=" + output("C")}; - runTest(true,false,null,-1); - - PrivacyConstraint outputConstraint = getPrivacyConstraintFromMetaData("C"); - Assert.assertEquals(expectedOutput, outputConstraint); - } - @Test public void integrationStringAppendTestNoneNone(){ PrivacyConstraint pc1 = new PrivacyConstraint(PrivacyLevel.None); @@ -920,25 +721,6 @@ public class AppendPropagatorTest extends AutomatedTestBase { integrationStringAppendTest(pc1, pc2, pc2); } - private void integrationStringAppendTest(PrivacyConstraint privacyConstraint1, PrivacyConstraint privacyConstraint2, - PrivacyConstraint expectedOutput){ - TestConfiguration config = getAndLoadTestConfiguration(TEST_NAME_STRING); - fullDMLScriptName = SCRIPT_DIR + TEST_DIR + config.getTestScript() + ".dml"; - - int cols = 1; - int rows = 1; - double[][] A = getRandomMatrix(rows, cols, -10, 10, 0.5, 1); - double[][] B = getRandomMatrix(rows, cols, -10, 10, 0.5, 1); - writeInputMatrixWithMTD("A", A, false, new MatrixCharacteristics(rows, cols), privacyConstraint1); - writeInputMatrixWithMTD("B", B, false, new MatrixCharacteristics(rows, cols), privacyConstraint2); - - programArgs = new String[]{"-nvargs", "A=" + input("A"), "B=" + input("B"), "C=" + output("C")}; - runTest(true,false,null,-1); - - PrivacyConstraint outputConstraint = getPrivacyConstraintFromMetaData("C"); - Assert.assertEquals(expectedOutput, outputConstraint); - } - @Ignore @Test public void integrationListAppendTestNoneNone(){ @@ -981,6 +763,223 @@ public class AppendPropagatorTest extends AutomatedTestBase { integrationListAppendTest(pc1, pc2, pc2); } + private static void generalOnlyRBindTest(PrivacyConstraint constraint1, PrivacyConstraint constraint2){ + int columns = 2; + int rows1 = 4; + int rows2 = 3; + MatrixBlock inputMatrix1 = new MatrixBlock(rows1,columns,3); + MatrixBlock inputMatrix2 = new MatrixBlock(rows2,columns,4); + AppendPropagator propagator = new RBindPropagator(inputMatrix1, constraint1, inputMatrix2, constraint2); + PrivacyConstraint mergedConstraint = propagator.propagate(); + Assert.assertEquals(mergedConstraint.getPrivacyLevel(), PrivacyLevel.None); + Map<DataRange, PrivacyLevel> firstHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( + new DataRange(new long[]{0,0}, new long[]{rows1-1,columns-1})); + firstHalfPrivacy.forEach((range,level) -> Assert.assertEquals(constraint1.getPrivacyLevel(),level)); + Map<DataRange, PrivacyLevel> secondHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( + new DataRange(new long[]{rows1,0}, new long[]{rows1+rows2-1,columns-1})); + secondHalfPrivacy.forEach((range,level) -> Assert.assertEquals(constraint2.getPrivacyLevel(),level)); + } + + private static void generalOnlyCBindTest(PrivacyConstraint constraint1, PrivacyConstraint constraint2){ + int rows = 2; + int columns1 = 4; + int columns2 = 3; + MatrixBlock inputMatrix1 = new MatrixBlock(rows,columns1,3); + MatrixBlock inputMatrix2 = new MatrixBlock(rows,columns2,4); + AppendPropagator propagator = new CBindPropagator(inputMatrix1, constraint1, inputMatrix2, constraint2); + PrivacyConstraint mergedConstraint = propagator.propagate(); + Assert.assertEquals(mergedConstraint.getPrivacyLevel(), PrivacyLevel.None); + Map<DataRange, PrivacyLevel> firstHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( + new DataRange(new long[]{0,0}, new long[]{rows-1,columns1-1})); + firstHalfPrivacy.forEach((range,level) -> Assert.assertEquals(constraint1.getPrivacyLevel(),level)); + Map<DataRange, PrivacyLevel> secondHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( + new DataRange(new long[]{0,columns1}, new long[]{rows,columns1+columns2-1})); + secondHalfPrivacy.forEach((range,level) -> Assert.assertEquals(constraint2.getPrivacyLevel(),level)); + } + + private static void generalOnlyListAppendTest(PrivacyConstraint constraint1, PrivacyConstraint constraint2){ + int length1 = 6; + List<Data> dataList1 = Arrays.asList(new Data[length1]); + ListObject input1 = new ListObject(dataList1); + int length2 = 11; + List<Data> dataList2 = Arrays.asList(new Data[length2]); + ListObject input2 = new ListObject(dataList2); + Propagator propagator = new ListAppendPropagator(input1, constraint1, input2, constraint2); + PrivacyConstraint mergedConstraint = propagator.propagate(); + Map<DataRange, PrivacyLevel> firstHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( + new DataRange(new long[]{0}, new long[]{length1-1}) + ); + firstHalfPrivacy.forEach((range,level) -> Assert.assertEquals(constraint1.getPrivacyLevel(),level)); + Map<DataRange, PrivacyLevel> secondHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( + new DataRange(new long[length1], new long[]{length1+length2-1}) + ); + secondHalfPrivacy.forEach((range,level) -> Assert.assertEquals(constraint2.getPrivacyLevel(),level)); + } + + private static void generalOnlyListRemoveAppendTest( + PrivacyConstraint constraint1, PrivacyConstraint constraint2, PrivacyLevel expected1, PrivacyLevel expected2){ + int dataLength = 9; + List<Data> dataList = new ArrayList<>(); + for ( int i = 0; i < dataLength; i++){ + dataList.add(new DoubleObject(i)); + } + ListObject inputList = new ListObject(dataList); + + int removePositionInt = 5; + ScalarObject removePosition = new IntObject(removePositionInt); + + PropagatorMultiReturn propagator = new ListRemovePropagator(inputList, constraint1, removePosition, constraint2); + PrivacyConstraint[] mergedConstraints = propagator.propagate(); + + Assert.assertEquals(expected1, mergedConstraints[0].getPrivacyLevel()); + Assert.assertEquals(expected2, mergedConstraints[1].getPrivacyLevel()); + Assert.assertFalse("The first output constraint should have no fine-grained constraints", mergedConstraints[0].hasFineGrainedConstraints()); + Assert.assertFalse("The second output constraint should have no fine-grained constraints", mergedConstraints[1].hasFineGrainedConstraints()); + } + + private static void finegrainedRBindTest(PrivacyConstraint constraint1, PrivacyConstraint constraint2){ + int columns = 2; + int rows1 = 4; + int rows2 = 3; + MatrixBlock inputMatrix1 = new MatrixBlock(rows1,columns,3); + MatrixBlock inputMatrix2 = new MatrixBlock(rows2,columns,4); + AppendPropagator propagator = new RBindPropagator(inputMatrix1, constraint1, inputMatrix2, constraint2); + PrivacyConstraint mergedConstraint = propagator.propagate(); + Assert.assertEquals(mergedConstraint.getPrivacyLevel(), PrivacyLevel.None); + Map<DataRange, PrivacyLevel> firstHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( + new DataRange(new long[]{0,0}, new long[]{rows1-1,columns-1})); + constraint1.getFineGrainedPrivacy().getAllConstraintsList().forEach( + constraint -> Assert.assertTrue("Merged constraint should contain same privacy levels as input 1", + firstHalfPrivacy.containsValue(constraint.getValue())) + ); + Map<DataRange, PrivacyLevel> secondHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( + new DataRange(new long[]{rows1,0}, new long[]{rows1+rows2-1,columns-1})); + constraint2.getFineGrainedPrivacy().getAllConstraintsList().forEach( + constraint -> Assert.assertTrue("Merged constraint should contain same privacy levels as input 2", + secondHalfPrivacy.containsValue(constraint.getValue())) + ); + } + + private static void finegrainedCBindTest(PrivacyConstraint constraint1, PrivacyConstraint constraint2){ + int rows = 6; + int columns1 = 4; + int columns2 = 3; + MatrixBlock inputMatrix1 = new MatrixBlock(rows,columns1,3); + MatrixBlock inputMatrix2 = new MatrixBlock(rows,columns2,4); + AppendPropagator propagator = new CBindPropagator(inputMatrix1, constraint1, inputMatrix2, constraint2); + PrivacyConstraint mergedConstraint = propagator.propagate(); + Assert.assertEquals(mergedConstraint.getPrivacyLevel(), PrivacyLevel.None); + Map<DataRange, PrivacyLevel> firstHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( + new DataRange(new long[]{0,0}, new long[]{rows-1,columns1-1})); + constraint1.getFineGrainedPrivacy().getAllConstraintsList().forEach( + constraint -> Assert.assertTrue("Merged constraint should contain same privacy levels as input 1", + firstHalfPrivacy.containsValue(constraint.getValue())) + ); + Map<DataRange, PrivacyLevel> secondHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( + new DataRange(new long[]{0,columns1}, new long[]{rows,columns1+columns2-1})); + constraint2.getFineGrainedPrivacy().getAllConstraintsList().forEach( + constraint -> Assert.assertTrue("Merged constraint should contain same privacy levels as input 2", + secondHalfPrivacy.containsValue(constraint.getValue())) + ); + } + + private static void finegrainedListAppendTest(PrivacyConstraint constraint1, PrivacyConstraint constraint2){ + int length1 = 6; + List<Data> dataList1 = Arrays.asList(new Data[length1]); + ListObject input1 = new ListObject(dataList1); + int length2 = 11; + List<Data> dataList2 = Arrays.asList(new Data[length2]); + ListObject input2 = new ListObject(dataList2); + Propagator propagator = new ListAppendPropagator(input1, constraint1, input2, constraint2); + PrivacyConstraint mergedConstraint = propagator.propagate(); + Assert.assertEquals(mergedConstraint.getPrivacyLevel(), PrivacyLevel.None); + Map<DataRange, PrivacyLevel> firstHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( + new DataRange(new long[]{0}, new long[]{length1-1}) + ); + constraint1.getFineGrainedPrivacy().getAllConstraintsList().forEach( + constraint -> Assert.assertTrue("Merged constraint should contain same privacy levels as input 1", + firstHalfPrivacy.containsValue(constraint.getValue())) + ); + Map<DataRange, PrivacyLevel> secondHalfPrivacy = mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel( + new DataRange(new long[]{length1}, new long[]{length1+length2-1}) + ); + constraint2.getFineGrainedPrivacy().getAllConstraintsList().forEach( + constraint -> Assert.assertTrue("Merged constraint should contain same privacy levels as input 2", + secondHalfPrivacy.containsValue(constraint.getValue())) + ); + } + + private static void finegrainedListRemoveAppendTest( + PrivacyConstraint constraint1, PrivacyConstraint constraint2, PrivacyLevel expectedOutput2){ + finegrainedListRemoveAppendTest(constraint1, constraint2, expectedOutput2, false); + } + + private static void finegrainedListRemoveAppendTest( + PrivacyConstraint constraint1, PrivacyConstraint constraint2, PrivacyLevel expectedOutput2, boolean singleElementPrivacy){ + int dataLength = 9; + List<Data> dataList = new ArrayList<>(); + for ( int i = 0; i < dataLength; i++){ + dataList.add(new DoubleObject(i)); + } + ListObject inputList = new ListObject(dataList); + int removePositionInt = 5; + ScalarObject removePosition = new IntObject(removePositionInt); + PropagatorMultiReturn propagator = new ListRemovePropagator(inputList, constraint1, removePosition, constraint2); + PrivacyConstraint[] mergedConstraints = propagator.propagate(); + + if ( !singleElementPrivacy ){ + Map<DataRange, PrivacyLevel> outputPrivacy = mergedConstraints[0].getFineGrainedPrivacy().getPrivacyLevel( + new DataRange(new long[]{0}, new long[]{dataLength-1}) + ); + constraint1.getFineGrainedPrivacy().getAllConstraintsList().forEach( + constraint -> Assert.assertTrue("Merged constraint should contain same privacy levels as input 1", + outputPrivacy.containsValue(constraint.getValue())) + ); + } + + Assert.assertEquals(expectedOutput2, mergedConstraints[1].getPrivacyLevel()); + Assert.assertFalse(mergedConstraints[1].hasFineGrainedConstraints()); + } + + private void integrationCBindTest(PrivacyConstraint privacyConstraint1, PrivacyConstraint privacyConstraint2, + PrivacyConstraint expectedOutput){ + TestConfiguration config = getAndLoadTestConfiguration(TEST_NAME_CBIND); + fullDMLScriptName = SCRIPT_DIR + TEST_DIR + config.getTestScript() + ".dml"; + + int cols1 = 20; + int cols2 = 30; + int rows = 10; + double[][] A = getRandomMatrix(rows, cols1, -10, 10, 0.5, 1); + double[][] B = getRandomMatrix(rows, cols2, -10, 10, 0.5, 1); + writeInputMatrixWithMTD("A", A, false, new MatrixCharacteristics(rows, cols1), privacyConstraint1); + writeInputMatrixWithMTD("B", B, false, new MatrixCharacteristics(rows, cols2), privacyConstraint2); + + programArgs = new String[]{"-nvargs", "A=" + input("A"), "B=" + input("B"), "C=" + output("C")}; + runTest(true,false,null,-1); + + PrivacyConstraint outputConstraint = getPrivacyConstraintFromMetaData("C"); + Assert.assertEquals(expectedOutput, outputConstraint); + } + + private void integrationStringAppendTest(PrivacyConstraint privacyConstraint1, PrivacyConstraint privacyConstraint2, + PrivacyConstraint expectedOutput){ + TestConfiguration config = getAndLoadTestConfiguration(TEST_NAME_STRING); + fullDMLScriptName = SCRIPT_DIR + TEST_DIR + config.getTestScript() + ".dml"; + + int cols = 1; + int rows = 1; + double[][] A = getRandomMatrix(rows, cols, -10, 10, 0.5, 1); + double[][] B = getRandomMatrix(rows, cols, -10, 10, 0.5, 1); + writeInputMatrixWithMTD("A", A, false, new MatrixCharacteristics(rows, cols), privacyConstraint1); + writeInputMatrixWithMTD("B", B, false, new MatrixCharacteristics(rows, cols), privacyConstraint2); + + programArgs = new String[]{"-nvargs", "A=" + input("A"), "B=" + input("B"), "C=" + output("C")}; + runTest(true,false,null,-1); + + PrivacyConstraint outputConstraint = getPrivacyConstraintFromMetaData("C"); + Assert.assertEquals(expectedOutput, outputConstraint); + } + private void integrationListAppendTest(PrivacyConstraint privacyConstraint1, PrivacyConstraint privacyConstraint2, PrivacyConstraint expectedOutput){ TestConfiguration config = getAndLoadTestConfiguration(TEST_NAME_LIST);