This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new f308d56 [MINOR] Cleanup warnings (imports, serial IDs, static,
formatting)
f308d56 is described below
commit f308d56c560df52785e52456ece60ac74de49672
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat Oct 31 15:48:39 2020 +0100
[MINOR] Cleanup warnings (imports, serial IDs, static, formatting)
---
.../runtime/compress/colgroup/ColGroupValue.java | 2 +-
.../sysds/runtime/compress/lib/LibLeftMultBy.java | 1002 ++++++++--------
.../sysds/runtime/compress/lib/LibRightMultBy.java | 1198 ++++++++++----------
.../sysds/runtime/compress/lib/LibScalar.java | 288 ++---
.../paramserv/FederatedPSControlThread.java | 8 +
.../finegrained/FineGrainedPrivacyList.java | 20 +-
.../privacy/propagation/PrivacyPropagator.java | 1 -
.../component/paramserv/SerializationTest.java | 8 +-
.../test/functions/privacy/ReadWriteTest.java | 2 +-
.../privacy/propagation/AppendPropagatorTest.java | 463 ++++----
10 files changed, 1493 insertions(+), 1499 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
index 21dfa85..ab06509 100644
---
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
+++
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
@@ -323,7 +323,7 @@ public abstract class ColGroupValue extends ColGroup
implements Cloneable {
return ret;
}
- protected final double[] sparsePreaggValues(int numVals, double v,
boolean allocNew, double[] dictVals) {
+ protected static double[] sparsePreaggValues(int numVals, double v,
boolean allocNew, double[] dictVals) {
double[] ret = allocNew ? new double[numVals + 1] :
allocDVector(numVals + 1, true);
for(int k = 0; k < numVals; k++)
diff --git
a/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java
b/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java
index 004e601..fd5a084 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java
@@ -45,505 +45,505 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.CommonThreadPool;
public class LibLeftMultBy {
- private static final Log LOG =
LogFactory.getLog(LibLeftMultBy.class.getName());
-
- public static MatrixBlock leftMultByMatrix(List<ColGroup> groups,
MatrixBlock that, MatrixBlock ret,
- boolean doTranspose, boolean allocTmp, int rl, int cl, boolean
overlapping, int k, Pair<Integer, int[]> v) {
-
- if(ret == null)
- ret = new MatrixBlock(rl, cl, false, rl * cl);
- else if(!(ret.getNumColumns() == cl && ret.getNumRows() == rl &&
ret.isAllocated()))
- ret.reset(rl, cl, false, rl * cl);
- that = that instanceof CompressedMatrixBlock ?
((CompressedMatrixBlock) that).decompress() : that;
-
- // if(that.getNumRows() == 1) {
- // if(k > 1) {
- // return leftMultByVectorTranspose(groups, that, ret, doTranspose, k,
v, overlapping);
- // }
- // else {
- // return leftMultByVectorTranspose(groups, that, ret, doTranspose,
true, v, overlapping);
- // }
- // }
- // else {
- return leftMultByMatrix(groups, that, ret, k, cl, v, overlapping);
- // }
- }
-
- public static void leftMultByTransposeSelf(List<ColGroup> groups,
MatrixBlock result, int gl, int gu, int k,
- int numColumns, Pair<Integer, int[]> v, boolean overlapping) {
- if(k <= 1 || overlapping) {
- leftMultByTransposeSelf(groups, result, gl, gu, v, overlapping);
- }
- else {
- try {
- ExecutorService pool = CommonThreadPool.get(k);
- ArrayList<MatrixMultTransposeTask> tasks = new ArrayList<>();
- int numgrp = groups.size();
- int blklen = (int) (Math.ceil((double) numgrp / (2 * k)));
- for(int i = 0; i < 2 * k & i * blklen < numColumns; i++)
- tasks.add(new MatrixMultTransposeTask(groups, result, i *
blklen,
- Math.min((i + 1) * blklen, numgrp), v, overlapping));
- List<Future<Object>> ret = pool.invokeAll(tasks);
- for(Future<Object> tret : ret)
- tret.get(); // check for errors
- pool.shutdown();
- }
- catch(InterruptedException | ExecutionException e) {
- throw new DMLRuntimeException(e);
- }
- }
- }
-
- private static MatrixBlock leftMultByMatrix(List<ColGroup> colGroups,
MatrixBlock that, MatrixBlock ret, int k,
- int numColumns, Pair<Integer, int[]> v, boolean overlapping) {
- ret.allocateDenseBlock();
- if(that.isInSparseFormat()) {
- ret = leftMultBySparseMatrix(colGroups, that, ret, k, numColumns,
v);
- }
- else {
- ret = leftMultByDenseMatrix(colGroups, that, ret, k, numColumns,
v, overlapping);
- }
-
- ret.setNonZeros(ret.getNumColumns() * ret.getNumRows());
- return ret;
- }
-
- private static MatrixBlock leftMultByDenseMatrix(List<ColGroup> colGroups,
MatrixBlock that, MatrixBlock ret, int k,
- int numColumns, Pair<Integer, int[]> v, boolean overlapping) {
- DenseBlock db = that.getDenseBlock();
- if(db == null)
- throw new DMLRuntimeException("Invalid LeftMult By Dense matrix,
input matrix was sparse");
-
- double[] retV = ret.getDenseBlockValues();
- double[] thatV;
- int blockU;
- int blockL = 0;
- for(ColGroup grp : colGroups)
- if(grp instanceof ColGroupUncompressed)
- ((ColGroupUncompressed) grp).leftMultByMatrix(that, ret);
-
- for(int b = 0; b < db.numBlocks(); b++) {
- int blockSize = db.blockSize(b);
- blockU = Math.min(blockL + blockSize, ret.getNumRows());
- thatV = db.valuesAt(b);
-
- if(k == 1 || overlapping) {
- // Pair<Integer, int[]> v = getMaxNumValues(colGroups);
- for(int j = 0; j < colGroups.size(); j++) {
- colGroups.get(j).leftMultByMatrix(thatV,
- retV,
- colGroups.get(j).getValues(),
- that.getNumRows(),
- ret.getNumColumns(),
- 0,
- ret.getNumRows(),
- 0);
- }
- }
- else {
- try {
- ExecutorService pool = CommonThreadPool.get(k);
- // compute remaining compressed column groups in parallel
- ArrayList<LeftMatrixMatrixMultTask> tasks = new
ArrayList<>();
- int rowBlockSize = 1;
- for(int blo = blockL; blo < blockU; blo += rowBlockSize) {
- tasks.add(new LeftMatrixMatrixMultTask(colGroups,
thatV, retV, that.getNumRows(), numColumns,
- blo, Math.min(blo + rowBlockSize, blockU), blo -
blockL, v));
- }
-
- List<Future<Object>> futures = pool.invokeAll(tasks);
-
- pool.shutdown();
- for(Future<Object> future : futures)
- future.get();
- }
- catch(InterruptedException | ExecutionException e) {
- throw new DMLRuntimeException(e);
- }
- }
- blockL += blockSize;
- }
- return ret;
- }
-
- private static MatrixBlock leftMultByVectorTranspose(List<ColGroup>
colGroups, MatrixBlock vector,
- MatrixBlock result, boolean doTranspose, boolean allocTmp,
Pair<Integer, int[]> v, boolean overlap) {
-
- MatrixBlock rowVector = vector;
- // Note that transpose here is a metadata operation since the input is
a vector.
- if(doTranspose) {
- rowVector = new MatrixBlock(1, vector.getNumRows(), false);
- LibMatrixReorg.transpose(vector, rowVector);
- }
-
- // initialize and allocate the result
- result.reset();
- result.allocateDenseBlock();
-
- // setup memory pool for reuse
- if(allocTmp) {
- // Pair<Integer, int[]> v = getMaxNumValues(colGroups);
- ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1); // +1 for
efficiency in DDC groups.
- for(int i = 0; i < colGroups.size(); i++) {
-
colGroups.get(i).leftMultByRowVector(rowVector.getDenseBlockValues(),
- result.getDenseBlockValues(),
- v.getRight()[i]);
- }
- }
- else {
-
- for(ColGroup grp : colGroups) {
- grp.leftMultByRowVector(rowVector.getDenseBlockValues(),
result.getDenseBlockValues(), -1);
- }
- }
-
- // delegate matrix-vector operation to each column group
-
- // post-processing
- if(allocTmp)
- ColGroupValue.cleanupThreadLocalMemory();
- result.recomputeNonZeros();
-
- return result;
- }
-
- public static MatrixBlock leftMultByVectorTranspose(List<ColGroup>
colGroups, MatrixBlock vector,
- MatrixBlock result, boolean doTranspose, int k, Pair<Integer, int[]>
v, boolean overlap) {
- // transpose vector if required
- MatrixBlock rowVector = vector;
- if(doTranspose) {
- rowVector = new MatrixBlock(1, vector.getNumRows(), false);
- LibMatrixReorg.transpose(vector, rowVector);
- }
-
- // initialize and allocate the result
- result.reset();
- result.allocateDenseBlock();
-
- // multi-threaded execution
- try {
- // compute uncompressed column group in parallel
- // ColGroupUncompressed uc = getUncompressedColGroup();
- // if(uc != null)
- // uc.leftMultByRowVector(rowVector, result, k);
-
- // compute remaining compressed column groups in parallel
- ExecutorService pool =
CommonThreadPool.get(Math.min(colGroups.size(), k));
- ArrayList<LeftMatrixVectorMultTask> tasks = new ArrayList<>();
-
- // if(overlap){
- tasks.add(new LeftMatrixVectorMultTask(colGroups, rowVector,
result, v));
- // } else{
- // ArrayList<ColGroup>[] grpParts =
createStaticTaskPartitioning(colGroups, 4 * k, true);
- // for(ArrayList<ColGroup> groups : grpParts)
- // tasks.add(new LeftMatrixVectorMultTask(groups, rowVector,
result, v));
- // }
-
- List<Future<Object>> ret = pool.invokeAll(tasks);
- pool.shutdown();
- for(Future<Object> tmp : ret)
- tmp.get();
-
- }
- catch(InterruptedException | ExecutionException e) {
- LOG.error(e);
- throw new DMLRuntimeException(e);
- }
-
- // post-processing
- result.recomputeNonZeros();
-
- return result;
- }
-
- private static MatrixBlock leftMultBySparseMatrix(List<ColGroup>
colGroups, MatrixBlock that, MatrixBlock ret,
- int k, int numColumns, Pair<Integer, int[]> v) {
-
- SparseBlock sb = that.getSparseBlock();
- if(sb == null)
- throw new DMLRuntimeException("Invalid Left Mult by Sparse matrix,
input matrix was dense");
-
- for(ColGroup grp : colGroups) {
- if(grp instanceof ColGroupUncompressed)
- ((ColGroupUncompressed) grp).leftMultByMatrix(that, ret);
- }
-
- if(k == 1) {
- double[][] materialized = new double[colGroups.size()][];
- boolean containsOLE = false;
- for(int i = 0; i < colGroups.size(); i++) {
- materialized[i] = colGroups.get(i).getValues();
- if(colGroups.get(i) instanceof ColGroupOLE) {
- containsOLE = true;
- }
- }
- double[] materializedRow = containsOLE ? new
double[CompressionSettings.BITMAP_BLOCK_SZ * 2] : null;
-
- for(int r = 0; r < that.getNumRows(); r++) {
- SparseRow row = sb.get(r);
- if(row != null) {
-
- for(int j = 0; j < colGroups.size(); j++) {
- colGroups.get(j).leftMultBySparseMatrix(row.size(),
- row.indexes(),
- row.values(),
- ret.getDenseBlockValues(),
- v.getRight()[j],
- materialized[j],
- that.getNumRows(),
- ret.getNumColumns(),
- r,
- materializedRow);
- }
- }
- }
- }
- else {
- ExecutorService pool = CommonThreadPool.get(k);
- ArrayList<LeftMatrixSparseMatrixMultTask> tasks = new
ArrayList<>();
- try {
- // compute remaining compressed column groups in parallel
- // List<ColGroup>[] parts =
createStaticTaskPartitioningForSparseMatrixMult(colGroups, k, false);
- // for(List<ColGroup> part : parts) {
- tasks.add(new LeftMatrixSparseMatrixMultTask(colGroups, sb,
ret.getDenseBlockValues(),
- that.getNumRows(), numColumns, v));
- // }
-
- List<Future<Object>> futures = pool.invokeAll(tasks);
- pool.shutdown();
- for(Future<Object> future : futures)
- future.get();
- }
- catch(InterruptedException | ExecutionException e) {
- throw new DMLRuntimeException(e);
- }
- }
-
- return ret;
-
- }
-
- private static void leftMultByTransposeSelf(List<ColGroup> groups,
MatrixBlock result, int gl, int gu,
- Pair<Integer, int[]> v, boolean overlapping) {
- final int numRows = groups.get(0).getNumRows();
-
- // preallocated dense tmp matrix blocks
- MatrixBlock lhs = new MatrixBlock(1, numRows, false);
- MatrixBlock tmpret = new MatrixBlock(1, result.getNumColumns(), false);
- lhs.allocateDenseBlock();
- tmpret.allocateDenseBlock();
-
- // setup memory pool for reuse
- ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1);
-
- // approach: for each colgroup, extract uncompressed columns one
at-a-time
- // vector-matrix multiplies against remaining col groups
- // for(int i = gl; i < gu; i++) {
- // get current group and relevant col groups
- // ColGroup group = groups.get(i);
- // int[] ixgroup = group.getColIndices();
- // List<ColGroup> tmpList = groups.subList(i, numGroups);
-
- // if(group instanceof ColGroupDDC // single DDC group
- // && ixgroup.length == 1 && !containsUC && numRows <
CompressionSettings.BITMAP_BLOCK_SZ) {
- // // compute vector-matrix partial result
- // leftMultByVectorTranspose(tmpList, (ColGroupDDC) group, tmpret);
-
- // // write partial results (disjoint non-zeros)
- // LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret,
ixgroup[0]);
- // }
- // else {
- // for all uncompressed lhs columns vectors
- for(int j = 0; j < result.getNumColumns(); j++) {
- ColGroup.decompressToBlock(lhs, j, groups);
-
- if(!lhs.isEmptyBlock(false)) {
- // tmpret.reset();
- // compute vector-matrix partial result
- // leftMultByMatrix(groups,lhs, tmpret, false, true, 0, 0,
overlapping, 1, v );
- leftMultByVectorTranspose(groups, lhs, tmpret, false, true, v,
overlapping);
- // LOG.error(tmpret);
-
- // write partial results (disjoint non-zeros)
- LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret,
j);
- }
- lhs.reset();
- // }
- // }
- }
-
- // post processing
- ColGroupValue.cleanupThreadLocalMemory();
- }
-
- private static class LeftMatrixVectorMultTask implements Callable<Object> {
- private final List<ColGroup> _groups;
- private final MatrixBlock _vect;
- private final MatrixBlock _ret;
- private final Pair<Integer, int[]> _v;
-
- protected LeftMatrixVectorMultTask(List<ColGroup> groups, MatrixBlock
vect, MatrixBlock ret,
- Pair<Integer, int[]> v) {
- _groups = groups;
- _vect = vect;
- _ret = ret;
- _v = v;
- }
-
- @Override
- public Object call() {
- // setup memory pool for reuse
- try {
- ColGroupValue.setupThreadLocalMemory(_v.getLeft() + 1);
- for(int i = 0; i < _groups.size(); i++) {
- _groups.get(i)
- .leftMultByRowVector(_vect.getDenseBlockValues(),
_ret.getDenseBlockValues(), _v.getRight()[i]);
- }
-
- ColGroupValue.cleanupThreadLocalMemory();
- }
- catch(Exception e) {
- throw new DMLRuntimeException(e);
- }
- return null;
- }
- }
-
- private static class LeftMatrixMatrixMultTask implements Callable<Object> {
- private final List<ColGroup> _group;
- private final double[] _that;
- private final double[] _ret;
- private final int _numRows;
- private final int _numCols;
- private final int _rl;
- private final int _ru;
- private final int _vOff;
- private final Pair<Integer, int[]> _v;
-
- protected LeftMatrixMatrixMultTask(List<ColGroup> group, double[]
that, double[] ret, int numRows, int numCols,
- int rl, int ru, int vOff, Pair<Integer, int[]> v) {
- _group = group;
- _that = that;
- _ret = ret;
- _numRows = numRows;
- _numCols = numCols;
- _rl = rl;
- _ru = ru;
- _vOff = vOff;
- _v = v;
- }
-
- @Override
- public Object call() {
- // setup memory pool for reuse
-
- double[][] materialized = new double[_group.size()][];
- for(int i = 0; i < _group.size(); i++) {
- materialized[i] = _group.get(i).getValues();
- }
- // Pair<Integer, int[]> v = getMaxNumValues(_group);
- try {
- ColGroupValue.setupThreadLocalMemory(_v.getLeft() + 1);
- for(int j = 0; j < _group.size(); j++) {
- _group.get(j).leftMultByMatrix(_that, _ret,
materialized[j], _numRows, _numCols, _rl, _ru, _vOff);
- }
- ColGroupValue.cleanupThreadLocalMemory();
-
- }
- catch(Exception e) {
- throw new DMLRuntimeException(e);
- }
- return null;
- }
- }
-
- private static class LeftMatrixSparseMatrixMultTask implements
Callable<Object> {
- private final List<ColGroup> _group;
- private final SparseBlock _that;
- private final double[] _ret;
- private final int _numRows;
- private final int _numCols;
- private final Pair<Integer, int[]> _v;
-
- protected LeftMatrixSparseMatrixMultTask(List<ColGroup> group,
SparseBlock that, double[] ret, int numRows,
- int numCols, Pair<Integer, int[]> v) {
- _group = group;
- _that = that;
- _ret = ret;
- _numRows = numRows;
- _numCols = numCols;
- _v = v;
- }
-
- @Override
- public Object call() {
- // setup memory pool for reuse
-
- // double[][] materialized = new double[_group.size()][];
- // for(int i = 0; i < _group.size(); i++) {
- // materialized[i] = _group.get(i).getValues();
- // }
-
- boolean containsOLE = false;
- for(int j = 0; j < _group.size(); j++) {
- if(_group.get(j) instanceof ColGroupOLE) {
- containsOLE = true;
- }
- }
- // Temporary Array to store 2 * block size in
- double[] tmpA = containsOLE ? new
double[CompressionSettings.BITMAP_BLOCK_SZ * 2] : null;
-
- ColGroupValue.setupThreadLocalMemory(_v.getLeft());
- try {
- for(int j = 0; j < _group.size(); j++) {
- double[] materializedV = _group.get(j).getValues();
- for(int r = 0; r < _that.numRows(); r++) {
- if(_that.get(r) != null) {
-
_group.get(j).leftMultBySparseMatrix(_that.get(r).size(),
- _that.get(r).indexes(),
- _that.get(r).values(),
- _ret,
- _v.getRight()[j],
- materializedV,
- _numRows,
- _numCols,
- r,
- tmpA);
- }
- }
- }
- }
- catch(Exception e) {
- e.printStackTrace();
- throw new DMLRuntimeException(e);
- }
- ColGroupValue.cleanupThreadLocalMemory();
- return null;
- }
- }
-
- private static class MatrixMultTransposeTask implements Callable<Object> {
- private final List<ColGroup> _groups;
- private final MatrixBlock _ret;
- private final int _gl;
- private final int _gu;
- private final Pair<Integer, int[]> _v;
- private final boolean _overlapping;
-
- protected MatrixMultTransposeTask(List<ColGroup> groups, MatrixBlock
ret, int gl, int gu,
- Pair<Integer, int[]> v, boolean overlapping) {
- _groups = groups;
- _ret = ret;
- _gl = gl;
- _gu = gu;
- _v = v;
- _overlapping = overlapping;
- }
-
- @Override
- public Object call() {
- leftMultByTransposeSelf(_groups, _ret, _gl, _gu, _v, _overlapping);
- return null;
- }
- }
+ private static final Log LOG =
LogFactory.getLog(LibLeftMultBy.class.getName());
+
+ public static MatrixBlock leftMultByMatrix(List<ColGroup> groups,
MatrixBlock that, MatrixBlock ret,
+ boolean doTranspose, boolean allocTmp, int rl, int cl, boolean
overlapping, int k, Pair<Integer, int[]> v) {
+
+ if(ret == null)
+ ret = new MatrixBlock(rl, cl, false, rl * cl);
+ else if(!(ret.getNumColumns() == cl && ret.getNumRows() == rl
&& ret.isAllocated()))
+ ret.reset(rl, cl, false, rl * cl);
+ that = that instanceof CompressedMatrixBlock ?
((CompressedMatrixBlock) that).decompress() : that;
+
+ // if(that.getNumRows() == 1) {
+ // if(k > 1) {
+ // return leftMultByVectorTranspose(groups, that, ret,
doTranspose, k, v, overlapping);
+ // }
+ // else {
+ // return leftMultByVectorTranspose(groups, that, ret,
doTranspose, true, v, overlapping);
+ // }
+ // }
+ // else {
+ return leftMultByMatrix(groups, that, ret, k, cl, v,
overlapping);
+ // }
+ }
+
+ public static void leftMultByTransposeSelf(List<ColGroup> groups,
MatrixBlock result, int gl, int gu, int k,
+ int numColumns, Pair<Integer, int[]> v, boolean overlapping) {
+ if(k <= 1 || overlapping) {
+ leftMultByTransposeSelf(groups, result, gl, gu, v,
overlapping);
+ }
+ else {
+ try {
+ ExecutorService pool = CommonThreadPool.get(k);
+ ArrayList<MatrixMultTransposeTask> tasks = new
ArrayList<>();
+ int numgrp = groups.size();
+ int blklen = (int) (Math.ceil((double) numgrp /
(2 * k)));
+ for(int i = 0; i < 2 * k & i * blklen <
numColumns; i++)
+ tasks.add(new
MatrixMultTransposeTask(groups, result, i * blklen,
+ Math.min((i + 1) * blklen,
numgrp), v, overlapping));
+ List<Future<Object>> ret =
pool.invokeAll(tasks);
+ for(Future<Object> tret : ret)
+ tret.get(); // check for errors
+ pool.shutdown();
+ }
+ catch(InterruptedException | ExecutionException e) {
+ throw new DMLRuntimeException(e);
+ }
+ }
+ }
+
+ private static MatrixBlock leftMultByMatrix(List<ColGroup> colGroups,
MatrixBlock that, MatrixBlock ret, int k,
+ int numColumns, Pair<Integer, int[]> v, boolean overlapping) {
+ ret.allocateDenseBlock();
+ if(that.isInSparseFormat()) {
+ ret = leftMultBySparseMatrix(colGroups, that, ret, k,
numColumns, v);
+ }
+ else {
+ ret = leftMultByDenseMatrix(colGroups, that, ret, k,
numColumns, v, overlapping);
+ }
+
+ ret.setNonZeros(ret.getNumColumns() * ret.getNumRows());
+ return ret;
+ }
+
+ private static MatrixBlock leftMultByDenseMatrix(List<ColGroup>
colGroups, MatrixBlock that, MatrixBlock ret, int k,
+ int numColumns, Pair<Integer, int[]> v, boolean overlapping) {
+ DenseBlock db = that.getDenseBlock();
+ if(db == null)
+ throw new DMLRuntimeException("Invalid LeftMult By
Dense matrix, input matrix was sparse");
+
+ double[] retV = ret.getDenseBlockValues();
+ double[] thatV;
+ int blockU;
+ int blockL = 0;
+ for(ColGroup grp : colGroups)
+ if(grp instanceof ColGroupUncompressed)
+ ((ColGroupUncompressed)
grp).leftMultByMatrix(that, ret);
+
+ for(int b = 0; b < db.numBlocks(); b++) {
+ int blockSize = db.blockSize(b);
+ blockU = Math.min(blockL + blockSize, ret.getNumRows());
+ thatV = db.valuesAt(b);
+
+ if(k == 1 || overlapping) {
+ // Pair<Integer, int[]> v =
getMaxNumValues(colGroups);
+ for(int j = 0; j < colGroups.size(); j++) {
+ colGroups.get(j).leftMultByMatrix(thatV,
+ retV,
+ colGroups.get(j).getValues(),
+ that.getNumRows(),
+ ret.getNumColumns(),
+ 0,
+ ret.getNumRows(),
+ 0);
+ }
+ }
+ else {
+ try {
+ ExecutorService pool =
CommonThreadPool.get(k);
+ // compute remaining compressed column
groups in parallel
+ ArrayList<LeftMatrixMatrixMultTask>
tasks = new ArrayList<>();
+ int rowBlockSize = 1;
+ for(int blo = blockL; blo < blockU; blo
+= rowBlockSize) {
+ tasks.add(new
LeftMatrixMatrixMultTask(colGroups, thatV, retV, that.getNumRows(), numColumns,
+ blo, Math.min(blo +
rowBlockSize, blockU), blo - blockL, v));
+ }
+
+ List<Future<Object>> futures =
pool.invokeAll(tasks);
+
+ pool.shutdown();
+ for(Future<Object> future : futures)
+ future.get();
+ }
+ catch(InterruptedException | ExecutionException
e) {
+ throw new DMLRuntimeException(e);
+ }
+ }
+ blockL += blockSize;
+ }
+ return ret;
+ }
+
+ private static MatrixBlock leftMultByVectorTranspose(List<ColGroup>
colGroups, MatrixBlock vector,
+ MatrixBlock result, boolean doTranspose, boolean allocTmp,
Pair<Integer, int[]> v, boolean overlap) {
+
+ MatrixBlock rowVector = vector;
+ // Note that transpose here is a metadata operation since the
input is a vector.
+ if(doTranspose) {
+ rowVector = new MatrixBlock(1, vector.getNumRows(),
false);
+ LibMatrixReorg.transpose(vector, rowVector);
+ }
+
+ // initialize and allocate the result
+ result.reset();
+ result.allocateDenseBlock();
+
+ // setup memory pool for reuse
+ if(allocTmp) {
+ // Pair<Integer, int[]> v = getMaxNumValues(colGroups);
+ ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1);
// +1 for efficiency in DDC groups.
+ for(int i = 0; i < colGroups.size(); i++) {
+
colGroups.get(i).leftMultByRowVector(rowVector.getDenseBlockValues(),
+ result.getDenseBlockValues(),
+ v.getRight()[i]);
+ }
+ }
+ else {
+
+ for(ColGroup grp : colGroups) {
+
grp.leftMultByRowVector(rowVector.getDenseBlockValues(),
result.getDenseBlockValues(), -1);
+ }
+ }
+
+ // delegate matrix-vector operation to each column group
+
+ // post-processing
+ if(allocTmp)
+ ColGroupValue.cleanupThreadLocalMemory();
+ result.recomputeNonZeros();
+
+ return result;
+ }
+
+ public static MatrixBlock leftMultByVectorTranspose(List<ColGroup>
colGroups, MatrixBlock vector,
+ MatrixBlock result, boolean doTranspose, int k, Pair<Integer,
int[]> v, boolean overlap) {
+ // transpose vector if required
+ MatrixBlock rowVector = vector;
+ if(doTranspose) {
+ rowVector = new MatrixBlock(1, vector.getNumRows(),
false);
+ LibMatrixReorg.transpose(vector, rowVector);
+ }
+
+ // initialize and allocate the result
+ result.reset();
+ result.allocateDenseBlock();
+
+ // multi-threaded execution
+ try {
+ // compute uncompressed column group in parallel
+ // ColGroupUncompressed uc = getUncompressedColGroup();
+ // if(uc != null)
+ // uc.leftMultByRowVector(rowVector, result, k);
+
+ // compute remaining compressed column groups in
parallel
+ ExecutorService pool =
CommonThreadPool.get(Math.min(colGroups.size(), k));
+ ArrayList<LeftMatrixVectorMultTask> tasks = new
ArrayList<>();
+
+ // if(overlap){
+ tasks.add(new LeftMatrixVectorMultTask(colGroups,
rowVector, result, v));
+ // } else{
+ // ArrayList<ColGroup>[] grpParts =
createStaticTaskPartitioning(colGroups, 4 * k, true);
+ // for(ArrayList<ColGroup> groups : grpParts)
+ // tasks.add(new LeftMatrixVectorMultTask(groups,
rowVector, result, v));
+ // }
+
+ List<Future<Object>> ret = pool.invokeAll(tasks);
+ pool.shutdown();
+ for(Future<Object> tmp : ret)
+ tmp.get();
+
+ }
+ catch(InterruptedException | ExecutionException e) {
+ LOG.error(e);
+ throw new DMLRuntimeException(e);
+ }
+
+ // post-processing
+ result.recomputeNonZeros();
+
+ return result;
+ }
+
+ private static MatrixBlock leftMultBySparseMatrix(List<ColGroup>
colGroups, MatrixBlock that, MatrixBlock ret,
+ int k, int numColumns, Pair<Integer, int[]> v) {
+
+ SparseBlock sb = that.getSparseBlock();
+ if(sb == null)
+ throw new DMLRuntimeException("Invalid Left Mult by
Sparse matrix, input matrix was dense");
+
+ for(ColGroup grp : colGroups) {
+ if(grp instanceof ColGroupUncompressed)
+ ((ColGroupUncompressed)
grp).leftMultByMatrix(that, ret);
+ }
+
+ if(k == 1) {
+ double[][] materialized = new
double[colGroups.size()][];
+ boolean containsOLE = false;
+ for(int i = 0; i < colGroups.size(); i++) {
+ materialized[i] = colGroups.get(i).getValues();
+ if(colGroups.get(i) instanceof ColGroupOLE) {
+ containsOLE = true;
+ }
+ }
+ double[] materializedRow = containsOLE ? new
double[CompressionSettings.BITMAP_BLOCK_SZ * 2] : null;
+
+ for(int r = 0; r < that.getNumRows(); r++) {
+ SparseRow row = sb.get(r);
+ if(row != null) {
+
+ for(int j = 0; j < colGroups.size();
j++) {
+
colGroups.get(j).leftMultBySparseMatrix(row.size(),
+ row.indexes(),
+ row.values(),
+
ret.getDenseBlockValues(),
+ v.getRight()[j],
+ materialized[j],
+ that.getNumRows(),
+ ret.getNumColumns(),
+ r,
+ materializedRow);
+ }
+ }
+ }
+ }
+ else {
+ ExecutorService pool = CommonThreadPool.get(k);
+ ArrayList<LeftMatrixSparseMatrixMultTask> tasks = new
ArrayList<>();
+ try {
+ // compute remaining compressed column groups
in parallel
+ // List<ColGroup>[] parts =
createStaticTaskPartitioningForSparseMatrixMult(colGroups, k, false);
+ // for(List<ColGroup> part : parts) {
+ tasks.add(new
LeftMatrixSparseMatrixMultTask(colGroups, sb, ret.getDenseBlockValues(),
+ that.getNumRows(), numColumns, v));
+ // }
+
+ List<Future<Object>> futures =
pool.invokeAll(tasks);
+ pool.shutdown();
+ for(Future<Object> future : futures)
+ future.get();
+ }
+ catch(InterruptedException | ExecutionException e) {
+ throw new DMLRuntimeException(e);
+ }
+ }
+
+ return ret;
+
+ }
+
+ private static void leftMultByTransposeSelf(List<ColGroup> groups,
MatrixBlock result, int gl, int gu,
+ Pair<Integer, int[]> v, boolean overlapping) {
+ final int numRows = groups.get(0).getNumRows();
+
+ // preallocated dense tmp matrix blocks
+ MatrixBlock lhs = new MatrixBlock(1, numRows, false);
+ MatrixBlock tmpret = new MatrixBlock(1, result.getNumColumns(),
false);
+ lhs.allocateDenseBlock();
+ tmpret.allocateDenseBlock();
+
+ // setup memory pool for reuse
+ ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1);
+
+ // approach: for each colgroup, extract uncompressed columns
one at-a-time
+ // vector-matrix multiplies against remaining col groups
+ // for(int i = gl; i < gu; i++) {
+ // get current group and relevant col groups
+ // ColGroup group = groups.get(i);
+ // int[] ixgroup = group.getColIndices();
+ // List<ColGroup> tmpList = groups.subList(i, numGroups);
+
+ // if(group instanceof ColGroupDDC // single DDC group
+ // && ixgroup.length == 1 && !containsUC && numRows <
CompressionSettings.BITMAP_BLOCK_SZ) {
+ // // compute vector-matrix partial result
+ // leftMultByVectorTranspose(tmpList, (ColGroupDDC) group,
tmpret);
+
+ // // write partial results (disjoint non-zeros)
+ // LinearAlgebraUtils.copyNonZerosToUpperTriangle(result,
tmpret, ixgroup[0]);
+ // }
+ // else {
+ // for all uncompressed lhs columns vectors
+ for(int j = 0; j < result.getNumColumns(); j++) {
+ ColGroup.decompressToBlock(lhs, j, groups);
+
+ if(!lhs.isEmptyBlock(false)) {
+ // tmpret.reset();
+ // compute vector-matrix partial result
+ // leftMultByMatrix(groups,lhs, tmpret, false,
true, 0, 0, overlapping, 1, v );
+ leftMultByVectorTranspose(groups, lhs, tmpret,
false, true, v, overlapping);
+ // LOG.error(tmpret);
+
+ // write partial results (disjoint non-zeros)
+
LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, j);
+ }
+ lhs.reset();
+ // }
+ // }
+ }
+
+ // post processing
+ ColGroupValue.cleanupThreadLocalMemory();
+ }
+
+ private static class LeftMatrixVectorMultTask implements
Callable<Object> {
+ private final List<ColGroup> _groups;
+ private final MatrixBlock _vect;
+ private final MatrixBlock _ret;
+ private final Pair<Integer, int[]> _v;
+
+ protected LeftMatrixVectorMultTask(List<ColGroup> groups,
MatrixBlock vect, MatrixBlock ret,
+ Pair<Integer, int[]> v) {
+ _groups = groups;
+ _vect = vect;
+ _ret = ret;
+ _v = v;
+ }
+
+ @Override
+ public Object call() {
+ // setup memory pool for reuse
+ try {
+
ColGroupValue.setupThreadLocalMemory(_v.getLeft() + 1);
+ for(int i = 0; i < _groups.size(); i++) {
+ _groups.get(i)
+
.leftMultByRowVector(_vect.getDenseBlockValues(), _ret.getDenseBlockValues(),
_v.getRight()[i]);
+ }
+
+ ColGroupValue.cleanupThreadLocalMemory();
+ }
+ catch(Exception e) {
+ throw new DMLRuntimeException(e);
+ }
+ return null;
+ }
+ }
+
+ private static class LeftMatrixMatrixMultTask implements
Callable<Object> {
+ private final List<ColGroup> _group;
+ private final double[] _that;
+ private final double[] _ret;
+ private final int _numRows;
+ private final int _numCols;
+ private final int _rl;
+ private final int _ru;
+ private final int _vOff;
+ private final Pair<Integer, int[]> _v;
+
+ protected LeftMatrixMatrixMultTask(List<ColGroup> group,
double[] that, double[] ret, int numRows, int numCols,
+ int rl, int ru, int vOff, Pair<Integer, int[]> v) {
+ _group = group;
+ _that = that;
+ _ret = ret;
+ _numRows = numRows;
+ _numCols = numCols;
+ _rl = rl;
+ _ru = ru;
+ _vOff = vOff;
+ _v = v;
+ }
+
+ @Override
+ public Object call() {
+ // setup memory pool for reuse
+
+ double[][] materialized = new double[_group.size()][];
+ for(int i = 0; i < _group.size(); i++) {
+ materialized[i] = _group.get(i).getValues();
+ }
+ // Pair<Integer, int[]> v = getMaxNumValues(_group);
+ try {
+
ColGroupValue.setupThreadLocalMemory(_v.getLeft() + 1);
+ for(int j = 0; j < _group.size(); j++) {
+ _group.get(j).leftMultByMatrix(_that,
_ret, materialized[j], _numRows, _numCols, _rl, _ru, _vOff);
+ }
+ ColGroupValue.cleanupThreadLocalMemory();
+
+ }
+ catch(Exception e) {
+ throw new DMLRuntimeException(e);
+ }
+ return null;
+ }
+ }
+
+ private static class LeftMatrixSparseMatrixMultTask implements
Callable<Object> {
+ private final List<ColGroup> _group;
+ private final SparseBlock _that;
+ private final double[] _ret;
+ private final int _numRows;
+ private final int _numCols;
+ private final Pair<Integer, int[]> _v;
+
+ protected LeftMatrixSparseMatrixMultTask(List<ColGroup> group,
SparseBlock that, double[] ret, int numRows,
+ int numCols, Pair<Integer, int[]> v) {
+ _group = group;
+ _that = that;
+ _ret = ret;
+ _numRows = numRows;
+ _numCols = numCols;
+ _v = v;
+ }
+
+ @Override
+ public Object call() {
+ // setup memory pool for reuse
+
+ // double[][] materialized = new
double[_group.size()][];
+ // for(int i = 0; i < _group.size(); i++) {
+ // materialized[i] = _group.get(i).getValues();
+ // }
+
+ boolean containsOLE = false;
+ for(int j = 0; j < _group.size(); j++) {
+ if(_group.get(j) instanceof ColGroupOLE) {
+ containsOLE = true;
+ }
+ }
+ // Temporary Array to store 2 * block size in
+ double[] tmpA = containsOLE ? new
double[CompressionSettings.BITMAP_BLOCK_SZ * 2] : null;
+
+ ColGroupValue.setupThreadLocalMemory(_v.getLeft());
+ try {
+ for(int j = 0; j < _group.size(); j++) {
+ double[] materializedV =
_group.get(j).getValues();
+ for(int r = 0; r < _that.numRows();
r++) {
+ if(_that.get(r) != null) {
+
_group.get(j).leftMultBySparseMatrix(_that.get(r).size(),
+
_that.get(r).indexes(),
+
_that.get(r).values(),
+ _ret,
+
_v.getRight()[j],
+ materializedV,
+ _numRows,
+ _numCols,
+ r,
+ tmpA);
+ }
+ }
+ }
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ throw new DMLRuntimeException(e);
+ }
+ ColGroupValue.cleanupThreadLocalMemory();
+ return null;
+ }
+ }
+
+ private static class MatrixMultTransposeTask implements
Callable<Object> {
+ private final List<ColGroup> _groups;
+ private final MatrixBlock _ret;
+ private final int _gl;
+ private final int _gu;
+ private final Pair<Integer, int[]> _v;
+ private final boolean _overlapping;
+
+ protected MatrixMultTransposeTask(List<ColGroup> groups,
MatrixBlock ret, int gl, int gu,
+ Pair<Integer, int[]> v, boolean overlapping) {
+ _groups = groups;
+ _ret = ret;
+ _gl = gl;
+ _gu = gu;
+ _v = v;
+ _overlapping = overlapping;
+ }
+
+ @Override
+ public Object call() {
+ leftMultByTransposeSelf(_groups, _ret, _gl, _gu, _v,
_overlapping);
+ return null;
+ }
+ }
}
diff --git
a/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java
b/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java
index df0f45e..761e5a4 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java
@@ -43,603 +43,603 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.CommonThreadPool;
public class LibRightMultBy {
- private static final Log LOG =
LogFactory.getLog(LibRightMultBy.class.getName());
-
- /**
- * Right multiply by matrix. Meaning a left hand side compressed matrix is
multiplied with a right hand side
- * uncompressed matrix.
- *
- * @param colGroups All Column groups in the compression
- * @param that The right hand side matrix
- * @param ret The MatrixBlock to return.
- * @param k The parallelization degree to use.
- * @param v The Precalculated counts and Maximum number of
tuple entries in the column groups.
- * @param allowOverlap Allow the multiplication to return an overlapped
matrix.
- * @return The Result Matrix, modified from the ret parameter.
- */
- public static MatrixBlock rightMultByMatrix(List<ColGroup> colGroups,
MatrixBlock that, MatrixBlock ret, int k,
- Pair<Integer, int[]> v, boolean allowOverlap) {
-
- boolean containsUncompressable = false;
- int distinctCount = 0;
- for(ColGroup g : colGroups) {
- if(g instanceof ColGroupValue) {
- distinctCount += ((ColGroupValue) g).getNumValues();
- }
- else {
- containsUncompressable = true;
- }
- }
- int rl = colGroups.get(0).getNumRows();
- int cl = that.getNumColumns();
- if(!allowOverlap || (containsUncompressable || distinctCount >= rl /
2)) {
- if(ret == null)
- ret = new MatrixBlock(rl, cl, false, rl * cl);
- else if(!(ret.getNumColumns() == cl && ret.getNumRows() == rl &&
ret.isAllocated()))
- ret.reset(rl, cl, false, rl * cl);
- ret.allocateDenseBlock();
- if(that.isInSparseFormat()) {
- ret = rightMultBySparseMatrix(colGroups, that, ret, k, v);
- }
- else {
- ret = rightMultByDenseMatrix(colGroups, that, ret, k, v);
-
- }
- ret.setNonZeros(ret.getNumColumns() * ret.getNumRows());
- }
- else {
- // Create an overlapping compressed Matrix Block.
- ret = new CompressedMatrixBlock(true);
-
- ret.setNumColumns(cl);
- ret.setNumRows(rl);
- CompressedMatrixBlock retC = (CompressedMatrixBlock) ret;
- retC.setOverlapping(true);
- if(that.isInSparseFormat()) {
- ret = rightMultBySparseMatrixCompressed(colGroups, that, retC,
k, v);
- }
- else {
- ret = rightMultByDenseMatrixCompressed(colGroups, that, retC,
k, v);
- }
- }
-
- return ret;
-
- }
-
- /**
- * Multi-threaded version of rightMultByVector.
- *
- * @param colGroups The Column groups used int the multiplication
- * @param vector matrix block vector to multiply with
- * @param result matrix block result to modify in the multiplication
- * @param k number of threads to use
- * @param v The Precalculated counts and Maximum number of tuple
entries in the column groups
- */
- public static void rightMultByVector(List<ColGroup> colGroups, MatrixBlock
vector, MatrixBlock result, int k,
- Pair<Integer, int[]> v) {
- // initialize and allocate the result
- result.allocateDenseBlock();
- if(k <= 1) {
- rightMultByVector(colGroups, vector, result, v);
- return;
- }
-
- // multi-threaded execution of all groups
- try {
- // ColGroupUncompressed uc = getUncompressedColGroup();
-
- // compute uncompressed column group in parallel
- // if(uc != null)
- // uc.rightMultByVector(vector, result, k);
-
- // compute remaining compressed column groups in parallel
- // note: OLE needs alignment to segment size, otherwise wrong entry
- ExecutorService pool = CommonThreadPool.get(k);
- int rlen = colGroups.get(0).getNumRows();
- int seqsz = CompressionSettings.BITMAP_BLOCK_SZ;
- int blklen = (int) (Math.ceil((double) rlen / k));
- blklen += (blklen % seqsz != 0) ? seqsz - blklen % seqsz : 0;
-
- ArrayList<RightMatrixVectorMultTask> tasks = new ArrayList<>();
- for(int i = 0; i < k & i * blklen < rlen; i++) {
- tasks.add(new RightMatrixVectorMultTask(colGroups, vector,
result, i * blklen,
- Math.min((i + 1) * blklen, rlen), v));
- }
-
- List<Future<Long>> ret = pool.invokeAll(tasks);
- pool.shutdown();
-
- // error handling and nnz aggregation
- long lnnz = 0;
- for(Future<Long> tmp : ret)
- lnnz += tmp.get();
- result.setNonZeros(lnnz);
- }
- catch(InterruptedException | ExecutionException e) {
- throw new DMLRuntimeException(e);
- }
- }
-
- /**
- * Multiply this matrix block by a column vector on the right.
- *
- * @param vector right-hand operand of the multiplication
- * @param result buffer to hold the result; must have the appropriate size
already
- * @param v The Precalculated counts and Maximum number of tuple
entries in the column groups.
- */
- private static void rightMultByVector(List<ColGroup> colGroups,
MatrixBlock vector, MatrixBlock result,
- Pair<Integer, int[]> v) {
-
- // delegate matrix-vector operation to each column group
- rightMultByVector(colGroups, vector, result, 0, result.getNumRows(),
v);
-
- // post-processing
- result.recomputeNonZeros();
- }
-
- private static MatrixBlock rightMultBySparseMatrix(List<ColGroup>
colGroups, MatrixBlock that, MatrixBlock ret,
- int k, Pair<Integer, int[]> v) {
- SparseBlock sb = that.getSparseBlock();
- double[] retV = ret.getDenseBlockValues();
-
- if(sb == null)
- throw new DMLRuntimeException("Invalid Right Mult by Sparse
matrix, input matrix was dense");
-
- for(ColGroup grp : colGroups) {
- if(grp instanceof ColGroupUncompressed)
- ((ColGroupUncompressed) grp).rightMultByMatrix(that, ret, 0,
ret.getNumColumns());
- }
-
- // Pair<Integer, int[]> v = Util.getMaxNumValues(colGroups);
- // if(k == 1) {
- for(int j = 0; j < colGroups.size(); j++) {
- double[] preAggregatedB = ((ColGroupValue)
colGroups.get(j)).preaggValues(v.getRight()[j],
- sb,
- colGroups.get(j).getValues(),
- 0,
- that.getNumColumns(),
- that.getNumColumns());
- colGroups.get(j).rightMultByMatrix(preAggregatedB,
- retV,
- that.getNumColumns(),
- 0,
- ret.getNumRows(),
- 0,
- that.getNumColumns());
-
- }
- // }
- // else {
- // ExecutorService pool = CommonThreadPool.get(k);
- // ArrayList<RightMultBySparseMatrixTask> tasks = new ArrayList<>();
- // try {
-
- // for(int j = 0; j < ret.getNumColumns(); j +=
CompressionSettings.BITMAP_BLOCK_SZ) {
- // tasks.add(new RightMultBySparseMatrixTask(colGroups, retV, sb,
materialized, v, numColumns, j,
- // Math.min(j + CompressionSettings.BITMAP_BLOCK_SZ,
ret.getNumColumns())));
- // }
-
- // List<Future<Object>> futures = pool.invokeAll(tasks);
- // pool.shutdown();
- // for(Future<Object> future : futures)
- // future.get();
- // }
- // catch(InterruptedException | ExecutionException e) {
- // throw new DMLRuntimeException(e);
- // }
- // }
-
- return ret;
- }
-
- private static MatrixBlock rightMultByDenseMatrix(List<ColGroup>
colGroups, MatrixBlock that, MatrixBlock ret,
- int k, Pair<Integer, int[]> v) {
-
- // long StartTime = System.currentTimeMillis();
- DenseBlock db = that.getDenseBlock();
- double[] retV = ret.getDenseBlockValues();
- double[] thatV;
-
- for(ColGroup grp : colGroups) {
- if(grp instanceof ColGroupUncompressed) {
- ((ColGroupUncompressed) grp).rightMultByMatrix(that, ret, 0,
ret.getNumRows());
- }
- }
-
- if(k == 1) {
- ColGroupValue.setupThreadLocalMemory((v.getLeft()));
- for(int b = 0; b < db.numBlocks(); b++) {
- // int blockSize = db.blockSize(b);
- thatV = db.valuesAt(b);
- for(int j = 0; j < colGroups.size(); j++) {
- int colBlockSize = 128;
- for(int i = 0; i < that.getNumColumns(); i +=
colBlockSize) {
- if(colGroups.get(j) instanceof ColGroupValue) {
- double[] preAggregatedB = ((ColGroupValue)
colGroups.get(j)).preaggValues(v.getRight()[j],
- thatV,
- colGroups.get(j).getValues(),
- i,
- Math.min(i + colBlockSize,
that.getNumColumns()),
- that.getNumColumns());
- int blklenRows =
CompressionSettings.BITMAP_BLOCK_SZ;
- for(int n = 0; n * blklenRows < ret.getNumRows();
n++) {
-
colGroups.get(j).rightMultByMatrix(preAggregatedB,
- retV,
- that.getNumColumns(),
- n * blklenRows,
- Math.min((n + 1) * blklenRows,
ret.getNumRows()),
- i,
- Math.min(i + colBlockSize,
that.getNumColumns()));
- }
- }
- }
- }
- }
- ColGroupValue.cleanupThreadLocalMemory();
- }
- else {
-
- thatV = db.valuesAt(0);
- ExecutorService pool = CommonThreadPool.get(k);
- ArrayList<RightMatrixMultTask> tasks = new ArrayList<>();
- ArrayList<RightMatrixPreAggregateTask> preTask = new
ArrayList<>(colGroups.size());
- // Pair<Integer, int[]> v;
- final int blkz = CompressionSettings.BITMAP_BLOCK_SZ;
- int blklenRows = (int) (Math.ceil((double) ret.getNumRows() / (2 *
k)));
-
- try {
- List<Future<double[]>> ag =
pool.invokeAll(preAggregate(colGroups, thatV, that, preTask, v));
- // DDC and RLE
- for(int j = 0; j * blklenRows < ret.getNumRows(); j++) {
- RightMatrixMultTask rmmt = new
RightMatrixMultTask(colGroups, retV, ag, v, that.getNumColumns(),
- j * blklenRows, Math.min((j + 1) * blklenRows,
ret.getNumRows()), 0, that.getNumColumns(),
- false, false);
- tasks.add(rmmt);
- }
- blklenRows += (blklenRows % blkz != 0) ? blkz - blklenRows %
blkz : 0;
- // OLE!
- for(int j = 0; j * blklenRows < ret.getNumRows(); j++) {
- RightMatrixMultTask rmmt = new
RightMatrixMultTask(colGroups, retV, ag, v, that.getNumColumns(),
- j * blklenRows, Math.min((j + 1) * blklenRows,
ret.getNumRows()), 0, that.getNumColumns(),
- false, true);
- tasks.add(rmmt);
- }
- for(Future<Object> future : pool.invokeAll(tasks))
- future.get();
- tasks.clear();
-
- }
- catch(InterruptedException | ExecutionException e) {
- throw new DMLRuntimeException(e);
- }
- }
-
- return ret;
- }
-
- private static MatrixBlock rightMultByDenseMatrixCompressed(List<ColGroup>
colGroups, MatrixBlock that,
- CompressedMatrixBlock ret, int k, Pair<Integer, int[]> v) {
-
- DenseBlock db = that.getDenseBlock();
- double[] thatV;
-
- for(ColGroup grp : colGroups) {
- if(grp instanceof ColGroupUncompressed) {
- throw new DMLCompressionException(
- "Right Mult by dense with compressed output is not
efficient to do with uncompressed Compressed ColGroups and therefore not
supported.");
- }
- }
-
- thatV = db.valuesAt(0);
- List<ColGroup> retCg = new ArrayList<ColGroup>();
- int[] newColIndexes = new int[that.getNumColumns()];
- for(int i = 0; i < that.getNumColumns(); i++) {
- newColIndexes[i] = i;
- }
- if(k == 1) {
- for(int j = 0; j < colGroups.size(); j++) {
- ColGroupValue g = (ColGroupValue) colGroups.get(j);
- double[] preAggregatedB = g.preaggValues(v.getRight()[j],
- thatV,
- g.getValues(),
- 0,
- that.getNumColumns(),
- that.getNumColumns(),
- new double[v.getRight()[j] * that.getNumColumns()]);
- retCg.add(g.copyAndSet(newColIndexes, preAggregatedB));
- }
- }
- else {
- thatV = db.valuesAt(0);
- ExecutorService pool = CommonThreadPool.get(k);
- ArrayList<RightMatrixPreAggregateTask> preTask = new
ArrayList<>(colGroups.size());
-
- try {
- List<Future<double[]>> ag =
pool.invokeAll(preAggregate(colGroups, thatV, that, preTask, v));
- for(int j = 0; j < colGroups.size(); j++) {
- retCg.add(((ColGroupValue)
colGroups.get(j)).copyAndSet(newColIndexes, ag.get(j).get()));
- }
- }
- catch(InterruptedException | ExecutionException e) {
- throw new DMLRuntimeException(e);
- }
- }
- ret.allocateColGroupList(retCg);
- ret.setOverlapping(true);
- ret.setNonZeros(-1);
-
- return ret;
- }
-
- private static MatrixBlock
rightMultBySparseMatrixCompressed(List<ColGroup> colGroups, MatrixBlock that,
- CompressedMatrixBlock ret, int k, Pair<Integer, int[]> v) {
-
- // long StartTime = System.currentTimeMillis();
- SparseBlock sb = that.getSparseBlock();
-
- for(ColGroup grp : colGroups) {
- if(grp instanceof ColGroupUncompressed) {
- throw new DMLCompressionException(
- "Right Mult by dense with compressed output is not
efficient to do with uncompressed Compressed ColGroups and therefore not
supported.");
- }
- }
-
- List<ColGroup> retCg = new ArrayList<ColGroup>();
- int[] newColIndexes = new int[that.getNumColumns()];
- for(int i = 0; i < that.getNumColumns(); i++) {
- newColIndexes[i] = i;
- }
- if(k == 1) {
- for(int j = 0; j < colGroups.size(); j++) {
- ColGroupValue g = (ColGroupValue) colGroups.get(j);
- double[] preAggregatedB = g.preaggValues(v.getRight()[j],
- sb,
- colGroups.get(j).getValues(),
- 0,
- that.getNumColumns(),
- that.getNumColumns(),
- new double[v.getRight()[j] * that.getNumColumns()]);
- retCg.add(g.copyAndSet(newColIndexes, preAggregatedB));
- }
- }
- else {
- ExecutorService pool = CommonThreadPool.get(k);
- ArrayList<RightMatrixPreAggregateSparseTask> preTask = new
ArrayList<>(colGroups.size());
-
- try {
- List<Future<double[]>> ag =
pool.invokeAll(preAggregate(colGroups, sb, that, preTask, v));
- for(int j = 0; j < colGroups.size(); j++) {
- retCg.add(((ColGroupValue)
colGroups.get(j)).copyAndSet(newColIndexes, ag.get(j).get()));
- }
- }
- catch(InterruptedException | ExecutionException e) {
- throw new DMLRuntimeException(e);
- }
- }
- ret.allocateColGroupList(retCg);
- ret.setOverlapping(true);
- ret.setNonZeros(-1);
-
- return ret;
- }
-
- private static ArrayList<RightMatrixPreAggregateTask>
preAggregate(List<ColGroup> colGroups, double[] thatV,
- MatrixBlock that, ArrayList<RightMatrixPreAggregateTask> preTask,
Pair<Integer, int[]> v) {
- preTask.clear();
- for(int h = 0; h < colGroups.size(); h++) {
- RightMatrixPreAggregateTask pAggT = new
RightMatrixPreAggregateTask((ColGroupValue) colGroups.get(h),
- v.getRight()[h], thatV, colGroups.get(h).getValues(), 0,
that.getNumColumns(), that.getNumColumns());
- preTask.add(pAggT);
- }
- return preTask;
- }
-
- private static ArrayList<RightMatrixPreAggregateSparseTask>
preAggregate(List<ColGroup> colGroups, SparseBlock sb,
- MatrixBlock that, ArrayList<RightMatrixPreAggregateSparseTask>
preTask, Pair<Integer, int[]> v) {
- preTask.clear();
- for(int h = 0; h < colGroups.size(); h++) {
- RightMatrixPreAggregateSparseTask pAggT = new
RightMatrixPreAggregateSparseTask(
- (ColGroupValue) colGroups.get(h), v.getRight()[h], sb,
colGroups.get(h).getValues(), 0,
- that.getNumColumns(), that.getNumColumns());
- preTask.add(pAggT);
- }
- return preTask;
- }
-
- private static void rightMultByVector(List<ColGroup> groups, MatrixBlock
vect, MatrixBlock ret, int rl, int ru,
- Pair<Integer, int[]> v) {
- // + 1 to enable containing a single 0 value in the dictionary that
was not materialized.
- // This is to handle the case of a DDC dictionary not materializing
the zero values.
- // A fine tradeoff!
- ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1);
-
- // boolean cacheDDC1 = ru - rl > CompressionSettings.BITMAP_BLOCK_SZ *
2;
-
- // process uncompressed column group (overwrites output)
- // if(inclUC) {
- for(ColGroup grp : groups) {
- if(grp instanceof ColGroupUncompressed)
- ((ColGroupUncompressed) grp).rightMultByVector(vect, ret, rl,
ru);
- }
-
- // process cache-conscious DDC1 groups (adds to output)
-
- // if(cacheDDC1) {
- // ArrayList<ColGroupDDC1> tmp = new ArrayList<>();
- // for(ColGroup grp : groups)
- // if(grp instanceof ColGroupDDC1)
- // tmp.add((ColGroupDDC1) grp);
- // if(!tmp.isEmpty())
- // ColGroupDDC1.rightMultByVector(tmp.toArray(new ColGroupDDC1[0]),
vect, ret, rl, ru);
- // }
- // process remaining groups (adds to output)
- double[] values = ret.getDenseBlockValues();
- for(ColGroup grp : groups) {
- if(!(grp instanceof ColGroupUncompressed)) {
- grp.rightMultByVector(vect.getDenseBlockValues(), values, rl,
ru, grp.getValues());
- }
- }
-
- ColGroupValue.cleanupThreadLocalMemory();
-
- }
-
- private static class RightMatrixMultTask implements Callable<Object> {
- private final List<ColGroup> _colGroups;
- // private final double[] _thatV;
- private final double[] _retV;
- private final List<Future<double[]>> _aggB;
- private final Pair<Integer, int[]> _v;
- private final int _numColumns;
-
- private final int _rl;
- private final int _ru;
- private final int _cl;
- private final int _cu;
- private final boolean _mem;
- private final boolean _skipOle;
-
- protected RightMatrixMultTask(List<ColGroup> groups, double[] retV,
List<Future<double[]>> aggB,
- Pair<Integer, int[]> v, int numColumns, int rl, int ru, int cl,
int cu, boolean mem, boolean skipOle) {
- _colGroups = groups;
- // _thatV = thatV;
- _retV = retV;
- _aggB = aggB;
- _v = v;
- _numColumns = numColumns;
- _rl = rl;
- _ru = ru;
- _cl = cl;
- _cu = cu;
- _mem = mem;
- _skipOle = skipOle;
- }
-
- @Override
- public Object call() {
- try {
- if(_mem)
- ColGroupValue.setupThreadLocalMemory((_v.getLeft()));
- for(int j = 0; j < _colGroups.size(); j++) {
- if(_colGroups.get(j) instanceof ColGroupOLE) {
- if(_skipOle) {
- _colGroups.get(j)
- .rightMultByMatrix(_aggB.get(j).get(), _retV,
_numColumns, _rl, _ru, _cl, _cu);
- }
- }
- else {
- if(!_skipOle) {
- _colGroups.get(j)
- .rightMultByMatrix(_aggB.get(j).get(), _retV,
_numColumns, _rl, _ru, _cl, _cu);
- }
- }
- }
- if(_mem)
- ColGroupValue.cleanupThreadLocalMemory();
- return null;
- }
- catch(Exception e) {
- LOG.error(e);
- throw new DMLRuntimeException(e);
- }
- }
- }
-
- private static class RightMatrixPreAggregateTask implements
Callable<double[]> {
- private final ColGroupValue _colGroup;
- private final int _numVals;
- private final double[] _b;
- private final double[] _dict;
-
- private final int _cl;
- private final int _cu;
- private final int _cut;
-
- protected RightMatrixPreAggregateTask(ColGroupValue colGroup, int
numVals, double[] b, double[] dict, int cl,
- int cu, int cut) {
- _colGroup = colGroup;
- _numVals = numVals;
- _b = b;
- _dict = dict;
- _cl = cl;
- _cu = cu;
- _cut = cut;
- }
-
- @Override
- public double[] call() {
- try {
- return _colGroup.preaggValues(_numVals, _b, _dict, _cl, _cu,
_cut);
- }
- catch(Exception e) {
- LOG.error(e);
- throw new DMLRuntimeException(e);
- }
- }
- }
-
- private static class RightMatrixPreAggregateSparseTask implements
Callable<double[]> {
- private final ColGroupValue _colGroup;
- private final int _numVals;
- private final SparseBlock _b;
- private final double[] _dict;
-
- private final int _cl;
- private final int _cu;
- private final int _cut;
-
- protected RightMatrixPreAggregateSparseTask(ColGroupValue colGroup,
int numVals, SparseBlock b, double[] dict,
- int cl, int cu, int cut) {
- _colGroup = colGroup;
- _numVals = numVals;
- _b = b;
- _dict = dict;
- _cl = cl;
- _cu = cu;
- _cut = cut;
- }
-
- @Override
- public double[] call() {
- try {
- return _colGroup.preaggValues(_numVals, _b, _dict, _cl, _cu,
_cut);
- }
- catch(Exception e) {
- LOG.error(e);
- throw new DMLRuntimeException(e);
- }
- }
- }
-
- private static class RightMatrixVectorMultTask implements Callable<Long> {
- private final List<ColGroup> _groups;
- private final MatrixBlock _vect;
- private final MatrixBlock _ret;
- private final int _rl;
- private final int _ru;
- private final Pair<Integer, int[]> _v;
-
- protected RightMatrixVectorMultTask(List<ColGroup> groups, MatrixBlock
vect, MatrixBlock ret, int rl, int ru,
- Pair<Integer, int[]> v) {
- _groups = groups;
- _vect = vect;
- _ret = ret;
- _rl = rl;
- _ru = ru;
- _v = v;
- }
-
- @Override
- public Long call() {
- try {
- rightMultByVector(_groups, _vect, _ret, _rl, _ru, _v);
- return _ret.recomputeNonZeros(_rl, _ru - 1, 0, 0);
- }
- catch(Exception e) {
- LOG.error(e);
- throw new DMLRuntimeException(e);
- }
- }
- }
+ private static final Log LOG =
LogFactory.getLog(LibRightMultBy.class.getName());
+
+ /**
+ * Right multiply by matrix. Meaning a left hand side compressed matrix
is multiplied with a right hand side
+ * uncompressed matrix.
+ *
+ * @param colGroups All Column groups in the compression
+ * @param that The right hand side matrix
+ * @param ret The MatrixBlock to return.
+ * @param k The parallelization degree to use.
+ * @param v The Precalculated counts and Maximum
number of tuple entries in the column groups.
+ * @param allowOverlap Allow the multiplication to return an overlapped
matrix.
+ * @return The Result Matrix, modified from the ret parameter.
+ */
+ public static MatrixBlock rightMultByMatrix(List<ColGroup> colGroups,
MatrixBlock that, MatrixBlock ret, int k,
+ Pair<Integer, int[]> v, boolean allowOverlap) {
+
+ boolean containsUncompressable = false;
+ int distinctCount = 0;
+ for(ColGroup g : colGroups) {
+ if(g instanceof ColGroupValue) {
+ distinctCount += ((ColGroupValue)
g).getNumValues();
+ }
+ else {
+ containsUncompressable = true;
+ }
+ }
+ int rl = colGroups.get(0).getNumRows();
+ int cl = that.getNumColumns();
+ if(!allowOverlap || (containsUncompressable || distinctCount >=
rl / 2)) {
+ if(ret == null)
+ ret = new MatrixBlock(rl, cl, false, rl * cl);
+ else if(!(ret.getNumColumns() == cl && ret.getNumRows()
== rl && ret.isAllocated()))
+ ret.reset(rl, cl, false, rl * cl);
+ ret.allocateDenseBlock();
+ if(that.isInSparseFormat()) {
+ ret = rightMultBySparseMatrix(colGroups, that,
ret, k, v);
+ }
+ else {
+ ret = rightMultByDenseMatrix(colGroups, that,
ret, k, v);
+
+ }
+ ret.setNonZeros(ret.getNumColumns() * ret.getNumRows());
+ }
+ else {
+ // Create an overlapping compressed Matrix Block.
+ ret = new CompressedMatrixBlock(true);
+
+ ret.setNumColumns(cl);
+ ret.setNumRows(rl);
+ CompressedMatrixBlock retC = (CompressedMatrixBlock)
ret;
+ retC.setOverlapping(true);
+ if(that.isInSparseFormat()) {
+ ret =
rightMultBySparseMatrixCompressed(colGroups, that, retC, k, v);
+ }
+ else {
+ ret =
rightMultByDenseMatrixCompressed(colGroups, that, retC, k, v);
+ }
+ }
+
+ return ret;
+
+ }
+
+ /**
+ * Multi-threaded version of rightMultByVector.
+ *
+ * @param colGroups The Column groups used int the multiplication
+ * @param vector matrix block vector to multiply with
+ * @param result matrix block result to modify in the
multiplication
+ * @param k number of threads to use
+ * @param v The Precalculated counts and Maximum number of
tuple entries in the column groups
+ */
+ public static void rightMultByVector(List<ColGroup> colGroups,
MatrixBlock vector, MatrixBlock result, int k,
+ Pair<Integer, int[]> v) {
+ // initialize and allocate the result
+ result.allocateDenseBlock();
+ if(k <= 1) {
+ rightMultByVector(colGroups, vector, result, v);
+ return;
+ }
+
+ // multi-threaded execution of all groups
+ try {
+ // ColGroupUncompressed uc = getUncompressedColGroup();
+
+ // compute uncompressed column group in parallel
+ // if(uc != null)
+ // uc.rightMultByVector(vector, result, k);
+
+ // compute remaining compressed column groups in
parallel
+ // note: OLE needs alignment to segment size, otherwise
wrong entry
+ ExecutorService pool = CommonThreadPool.get(k);
+ int rlen = colGroups.get(0).getNumRows();
+ int seqsz = CompressionSettings.BITMAP_BLOCK_SZ;
+ int blklen = (int) (Math.ceil((double) rlen / k));
+ blklen += (blklen % seqsz != 0) ? seqsz - blklen %
seqsz : 0;
+
+ ArrayList<RightMatrixVectorMultTask> tasks = new
ArrayList<>();
+ for(int i = 0; i < k & i * blklen < rlen; i++) {
+ tasks.add(new
RightMatrixVectorMultTask(colGroups, vector, result, i * blklen,
+ Math.min((i + 1) * blklen, rlen), v));
+ }
+
+ List<Future<Long>> ret = pool.invokeAll(tasks);
+ pool.shutdown();
+
+ // error handling and nnz aggregation
+ long lnnz = 0;
+ for(Future<Long> tmp : ret)
+ lnnz += tmp.get();
+ result.setNonZeros(lnnz);
+ }
+ catch(InterruptedException | ExecutionException e) {
+ throw new DMLRuntimeException(e);
+ }
+ }
+
+ /**
+ * Multiply this matrix block by a column vector on the right.
+ *
+ * @param vector right-hand operand of the multiplication
+ * @param result buffer to hold the result; must have the appropriate
size already
+ * @param v The Precalculated counts and Maximum number of tuple
entries in the column groups.
+ */
+ private static void rightMultByVector(List<ColGroup> colGroups,
MatrixBlock vector, MatrixBlock result,
+ Pair<Integer, int[]> v) {
+
+ // delegate matrix-vector operation to each column group
+ rightMultByVector(colGroups, vector, result, 0,
result.getNumRows(), v);
+
+ // post-processing
+ result.recomputeNonZeros();
+ }
+
+ private static MatrixBlock rightMultBySparseMatrix(List<ColGroup>
colGroups, MatrixBlock that, MatrixBlock ret,
+ int k, Pair<Integer, int[]> v) {
+ SparseBlock sb = that.getSparseBlock();
+ double[] retV = ret.getDenseBlockValues();
+
+ if(sb == null)
+ throw new DMLRuntimeException("Invalid Right Mult by
Sparse matrix, input matrix was dense");
+
+ for(ColGroup grp : colGroups) {
+ if(grp instanceof ColGroupUncompressed)
+ ((ColGroupUncompressed)
grp).rightMultByMatrix(that, ret, 0, ret.getNumColumns());
+ }
+
+ // Pair<Integer, int[]> v = Util.getMaxNumValues(colGroups);
+ // if(k == 1) {
+ for(int j = 0; j < colGroups.size(); j++) {
+ double[] preAggregatedB = ((ColGroupValue)
colGroups.get(j)).preaggValues(v.getRight()[j],
+ sb,
+ colGroups.get(j).getValues(),
+ 0,
+ that.getNumColumns(),
+ that.getNumColumns());
+ colGroups.get(j).rightMultByMatrix(preAggregatedB,
+ retV,
+ that.getNumColumns(),
+ 0,
+ ret.getNumRows(),
+ 0,
+ that.getNumColumns());
+
+ }
+ // }
+ // else {
+ // ExecutorService pool = CommonThreadPool.get(k);
+ // ArrayList<RightMultBySparseMatrixTask> tasks = new
ArrayList<>();
+ // try {
+
+ // for(int j = 0; j < ret.getNumColumns(); j +=
CompressionSettings.BITMAP_BLOCK_SZ) {
+ // tasks.add(new RightMultBySparseMatrixTask(colGroups, retV,
sb, materialized, v, numColumns, j,
+ // Math.min(j + CompressionSettings.BITMAP_BLOCK_SZ,
ret.getNumColumns())));
+ // }
+
+ // List<Future<Object>> futures = pool.invokeAll(tasks);
+ // pool.shutdown();
+ // for(Future<Object> future : futures)
+ // future.get();
+ // }
+ // catch(InterruptedException | ExecutionException e) {
+ // throw new DMLRuntimeException(e);
+ // }
+ // }
+
+ return ret;
+ }
+
+ private static MatrixBlock rightMultByDenseMatrix(List<ColGroup>
colGroups, MatrixBlock that, MatrixBlock ret,
+ int k, Pair<Integer, int[]> v) {
+
+ // long StartTime = System.currentTimeMillis();
+ DenseBlock db = that.getDenseBlock();
+ double[] retV = ret.getDenseBlockValues();
+ double[] thatV;
+
+ for(ColGroup grp : colGroups) {
+ if(grp instanceof ColGroupUncompressed) {
+ ((ColGroupUncompressed)
grp).rightMultByMatrix(that, ret, 0, ret.getNumRows());
+ }
+ }
+
+ if(k == 1) {
+ ColGroupValue.setupThreadLocalMemory((v.getLeft()));
+ for(int b = 0; b < db.numBlocks(); b++) {
+ // int blockSize = db.blockSize(b);
+ thatV = db.valuesAt(b);
+ for(int j = 0; j < colGroups.size(); j++) {
+ int colBlockSize = 128;
+ for(int i = 0; i <
that.getNumColumns(); i += colBlockSize) {
+ if(colGroups.get(j) instanceof
ColGroupValue) {
+ double[] preAggregatedB
= ((ColGroupValue) colGroups.get(j)).preaggValues(v.getRight()[j],
+ thatV,
+
colGroups.get(j).getValues(),
+ i,
+ Math.min(i +
colBlockSize, that.getNumColumns()),
+
that.getNumColumns());
+ int blklenRows =
CompressionSettings.BITMAP_BLOCK_SZ;
+ for(int n = 0; n *
blklenRows < ret.getNumRows(); n++) {
+
colGroups.get(j).rightMultByMatrix(preAggregatedB,
+ retV,
+
that.getNumColumns(),
+ n *
blklenRows,
+
Math.min((n + 1) * blklenRows, ret.getNumRows()),
+ i,
+
Math.min(i + colBlockSize, that.getNumColumns()));
+ }
+ }
+ }
+ }
+ }
+ ColGroupValue.cleanupThreadLocalMemory();
+ }
+ else {
+
+ thatV = db.valuesAt(0);
+ ExecutorService pool = CommonThreadPool.get(k);
+ ArrayList<RightMatrixMultTask> tasks = new
ArrayList<>();
+ ArrayList<RightMatrixPreAggregateTask> preTask = new
ArrayList<>(colGroups.size());
+ // Pair<Integer, int[]> v;
+ final int blkz = CompressionSettings.BITMAP_BLOCK_SZ;
+ int blklenRows = (int) (Math.ceil((double)
ret.getNumRows() / (2 * k)));
+
+ try {
+ List<Future<double[]>> ag =
pool.invokeAll(preAggregate(colGroups, thatV, that, preTask, v));
+ // DDC and RLE
+ for(int j = 0; j * blklenRows <
ret.getNumRows(); j++) {
+ RightMatrixMultTask rmmt = new
RightMatrixMultTask(colGroups, retV, ag, v, that.getNumColumns(),
+ j * blklenRows, Math.min((j +
1) * blklenRows, ret.getNumRows()), 0, that.getNumColumns(),
+ false, false);
+ tasks.add(rmmt);
+ }
+ blklenRows += (blklenRows % blkz != 0) ? blkz -
blklenRows % blkz : 0;
+ // OLE!
+ for(int j = 0; j * blklenRows <
ret.getNumRows(); j++) {
+ RightMatrixMultTask rmmt = new
RightMatrixMultTask(colGroups, retV, ag, v, that.getNumColumns(),
+ j * blklenRows, Math.min((j +
1) * blklenRows, ret.getNumRows()), 0, that.getNumColumns(),
+ false, true);
+ tasks.add(rmmt);
+ }
+ for(Future<Object> future :
pool.invokeAll(tasks))
+ future.get();
+ tasks.clear();
+
+ }
+ catch(InterruptedException | ExecutionException e) {
+ throw new DMLRuntimeException(e);
+ }
+ }
+
+ return ret;
+ }
+
+ private static MatrixBlock
rightMultByDenseMatrixCompressed(List<ColGroup> colGroups, MatrixBlock that,
+ CompressedMatrixBlock ret, int k, Pair<Integer, int[]> v) {
+
+ DenseBlock db = that.getDenseBlock();
+ double[] thatV;
+
+ for(ColGroup grp : colGroups) {
+ if(grp instanceof ColGroupUncompressed) {
+ throw new DMLCompressionException(
+ "Right Mult by dense with compressed
output is not efficient to do with uncompressed Compressed ColGroups and
therefore not supported.");
+ }
+ }
+
+ thatV = db.valuesAt(0);
+ List<ColGroup> retCg = new ArrayList<>();
+ int[] newColIndexes = new int[that.getNumColumns()];
+ for(int i = 0; i < that.getNumColumns(); i++) {
+ newColIndexes[i] = i;
+ }
+ if(k == 1) {
+ for(int j = 0; j < colGroups.size(); j++) {
+ ColGroupValue g = (ColGroupValue)
colGroups.get(j);
+ double[] preAggregatedB =
g.preaggValues(v.getRight()[j],
+ thatV,
+ g.getValues(),
+ 0,
+ that.getNumColumns(),
+ that.getNumColumns(),
+ new double[v.getRight()[j] *
that.getNumColumns()]);
+ retCg.add(g.copyAndSet(newColIndexes,
preAggregatedB));
+ }
+ }
+ else {
+ thatV = db.valuesAt(0);
+ ExecutorService pool = CommonThreadPool.get(k);
+ ArrayList<RightMatrixPreAggregateTask> preTask = new
ArrayList<>(colGroups.size());
+
+ try {
+ List<Future<double[]>> ag =
pool.invokeAll(preAggregate(colGroups, thatV, that, preTask, v));
+ for(int j = 0; j < colGroups.size(); j++) {
+ retCg.add(((ColGroupValue)
colGroups.get(j)).copyAndSet(newColIndexes, ag.get(j).get()));
+ }
+ }
+ catch(InterruptedException | ExecutionException e) {
+ throw new DMLRuntimeException(e);
+ }
+ }
+ ret.allocateColGroupList(retCg);
+ ret.setOverlapping(true);
+ ret.setNonZeros(-1);
+
+ return ret;
+ }
+
+ private static MatrixBlock
rightMultBySparseMatrixCompressed(List<ColGroup> colGroups, MatrixBlock that,
+ CompressedMatrixBlock ret, int k, Pair<Integer, int[]> v) {
+
+ // long StartTime = System.currentTimeMillis();
+ SparseBlock sb = that.getSparseBlock();
+
+ for(ColGroup grp : colGroups) {
+ if(grp instanceof ColGroupUncompressed) {
+ throw new DMLCompressionException(
+ "Right Mult by dense with compressed
output is not efficient to do with uncompressed Compressed ColGroups and
therefore not supported.");
+ }
+ }
+
+ List<ColGroup> retCg = new ArrayList<>();
+ int[] newColIndexes = new int[that.getNumColumns()];
+ for(int i = 0; i < that.getNumColumns(); i++) {
+ newColIndexes[i] = i;
+ }
+ if(k == 1) {
+ for(int j = 0; j < colGroups.size(); j++) {
+ ColGroupValue g = (ColGroupValue)
colGroups.get(j);
+ double[] preAggregatedB =
g.preaggValues(v.getRight()[j],
+ sb,
+ colGroups.get(j).getValues(),
+ 0,
+ that.getNumColumns(),
+ that.getNumColumns(),
+ new double[v.getRight()[j] *
that.getNumColumns()]);
+ retCg.add(g.copyAndSet(newColIndexes,
preAggregatedB));
+ }
+ }
+ else {
+ ExecutorService pool = CommonThreadPool.get(k);
+ ArrayList<RightMatrixPreAggregateSparseTask> preTask =
new ArrayList<>(colGroups.size());
+
+ try {
+ List<Future<double[]>> ag =
pool.invokeAll(preAggregate(colGroups, sb, that, preTask, v));
+ for(int j = 0; j < colGroups.size(); j++) {
+ retCg.add(((ColGroupValue)
colGroups.get(j)).copyAndSet(newColIndexes, ag.get(j).get()));
+ }
+ }
+ catch(InterruptedException | ExecutionException e) {
+ throw new DMLRuntimeException(e);
+ }
+ }
+ ret.allocateColGroupList(retCg);
+ ret.setOverlapping(true);
+ ret.setNonZeros(-1);
+
+ return ret;
+ }
+
+ private static ArrayList<RightMatrixPreAggregateTask>
preAggregate(List<ColGroup> colGroups, double[] thatV,
+ MatrixBlock that, ArrayList<RightMatrixPreAggregateTask>
preTask, Pair<Integer, int[]> v) {
+ preTask.clear();
+ for(int h = 0; h < colGroups.size(); h++) {
+ RightMatrixPreAggregateTask pAggT = new
RightMatrixPreAggregateTask((ColGroupValue) colGroups.get(h),
+ v.getRight()[h], thatV,
colGroups.get(h).getValues(), 0, that.getNumColumns(), that.getNumColumns());
+ preTask.add(pAggT);
+ }
+ return preTask;
+ }
+
+ private static ArrayList<RightMatrixPreAggregateSparseTask>
preAggregate(List<ColGroup> colGroups, SparseBlock sb,
+ MatrixBlock that, ArrayList<RightMatrixPreAggregateSparseTask>
preTask, Pair<Integer, int[]> v) {
+ preTask.clear();
+ for(int h = 0; h < colGroups.size(); h++) {
+ RightMatrixPreAggregateSparseTask pAggT = new
RightMatrixPreAggregateSparseTask(
+ (ColGroupValue) colGroups.get(h),
v.getRight()[h], sb, colGroups.get(h).getValues(), 0,
+ that.getNumColumns(), that.getNumColumns());
+ preTask.add(pAggT);
+ }
+ return preTask;
+ }
+
+ private static void rightMultByVector(List<ColGroup> groups,
MatrixBlock vect, MatrixBlock ret, int rl, int ru,
+ Pair<Integer, int[]> v) {
+ // + 1 to enable containing a single 0 value in the dictionary
that was not materialized.
+ // This is to handle the case of a DDC dictionary not
materializing the zero values.
+ // A fine tradeoff!
+ ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1);
+
+ // boolean cacheDDC1 = ru - rl >
CompressionSettings.BITMAP_BLOCK_SZ * 2;
+
+ // process uncompressed column group (overwrites output)
+ // if(inclUC) {
+ for(ColGroup grp : groups) {
+ if(grp instanceof ColGroupUncompressed)
+ ((ColGroupUncompressed)
grp).rightMultByVector(vect, ret, rl, ru);
+ }
+
+ // process cache-conscious DDC1 groups (adds to output)
+
+ // if(cacheDDC1) {
+ // ArrayList<ColGroupDDC1> tmp = new ArrayList<>();
+ // for(ColGroup grp : groups)
+ // if(grp instanceof ColGroupDDC1)
+ // tmp.add((ColGroupDDC1) grp);
+ // if(!tmp.isEmpty())
+ // ColGroupDDC1.rightMultByVector(tmp.toArray(new
ColGroupDDC1[0]), vect, ret, rl, ru);
+ // }
+ // process remaining groups (adds to output)
+ double[] values = ret.getDenseBlockValues();
+ for(ColGroup grp : groups) {
+ if(!(grp instanceof ColGroupUncompressed)) {
+
grp.rightMultByVector(vect.getDenseBlockValues(), values, rl, ru,
grp.getValues());
+ }
+ }
+
+ ColGroupValue.cleanupThreadLocalMemory();
+
+ }
+
+ private static class RightMatrixMultTask implements Callable<Object> {
+ private final List<ColGroup> _colGroups;
+ // private final double[] _thatV;
+ private final double[] _retV;
+ private final List<Future<double[]>> _aggB;
+ private final Pair<Integer, int[]> _v;
+ private final int _numColumns;
+
+ private final int _rl;
+ private final int _ru;
+ private final int _cl;
+ private final int _cu;
+ private final boolean _mem;
+ private final boolean _skipOle;
+
+ protected RightMatrixMultTask(List<ColGroup> groups, double[]
retV, List<Future<double[]>> aggB,
+ Pair<Integer, int[]> v, int numColumns, int rl, int ru,
int cl, int cu, boolean mem, boolean skipOle) {
+ _colGroups = groups;
+ // _thatV = thatV;
+ _retV = retV;
+ _aggB = aggB;
+ _v = v;
+ _numColumns = numColumns;
+ _rl = rl;
+ _ru = ru;
+ _cl = cl;
+ _cu = cu;
+ _mem = mem;
+ _skipOle = skipOle;
+ }
+
+ @Override
+ public Object call() {
+ try {
+ if(_mem)
+
ColGroupValue.setupThreadLocalMemory((_v.getLeft()));
+ for(int j = 0; j < _colGroups.size(); j++) {
+ if(_colGroups.get(j) instanceof
ColGroupOLE) {
+ if(_skipOle) {
+ _colGroups.get(j)
+
.rightMultByMatrix(_aggB.get(j).get(), _retV, _numColumns, _rl, _ru, _cl, _cu);
+ }
+ }
+ else {
+ if(!_skipOle) {
+ _colGroups.get(j)
+
.rightMultByMatrix(_aggB.get(j).get(), _retV, _numColumns, _rl, _ru, _cl, _cu);
+ }
+ }
+ }
+ if(_mem)
+
ColGroupValue.cleanupThreadLocalMemory();
+ return null;
+ }
+ catch(Exception e) {
+ LOG.error(e);
+ throw new DMLRuntimeException(e);
+ }
+ }
+ }
+
+ private static class RightMatrixPreAggregateTask implements
Callable<double[]> {
+ private final ColGroupValue _colGroup;
+ private final int _numVals;
+ private final double[] _b;
+ private final double[] _dict;
+
+ private final int _cl;
+ private final int _cu;
+ private final int _cut;
+
+ protected RightMatrixPreAggregateTask(ColGroupValue colGroup,
int numVals, double[] b, double[] dict, int cl,
+ int cu, int cut) {
+ _colGroup = colGroup;
+ _numVals = numVals;
+ _b = b;
+ _dict = dict;
+ _cl = cl;
+ _cu = cu;
+ _cut = cut;
+ }
+
+ @Override
+ public double[] call() {
+ try {
+ return _colGroup.preaggValues(_numVals, _b,
_dict, _cl, _cu, _cut);
+ }
+ catch(Exception e) {
+ LOG.error(e);
+ throw new DMLRuntimeException(e);
+ }
+ }
+ }
+
+ private static class RightMatrixPreAggregateSparseTask implements
Callable<double[]> {
+ private final ColGroupValue _colGroup;
+ private final int _numVals;
+ private final SparseBlock _b;
+ private final double[] _dict;
+
+ private final int _cl;
+ private final int _cu;
+ private final int _cut;
+
+ protected RightMatrixPreAggregateSparseTask(ColGroupValue
colGroup, int numVals, SparseBlock b, double[] dict,
+ int cl, int cu, int cut) {
+ _colGroup = colGroup;
+ _numVals = numVals;
+ _b = b;
+ _dict = dict;
+ _cl = cl;
+ _cu = cu;
+ _cut = cut;
+ }
+
+ @Override
+ public double[] call() {
+ try {
+ return _colGroup.preaggValues(_numVals, _b,
_dict, _cl, _cu, _cut);
+ }
+ catch(Exception e) {
+ LOG.error(e);
+ throw new DMLRuntimeException(e);
+ }
+ }
+ }
+
+ private static class RightMatrixVectorMultTask implements
Callable<Long> {
+ private final List<ColGroup> _groups;
+ private final MatrixBlock _vect;
+ private final MatrixBlock _ret;
+ private final int _rl;
+ private final int _ru;
+ private final Pair<Integer, int[]> _v;
+
+ protected RightMatrixVectorMultTask(List<ColGroup> groups,
MatrixBlock vect, MatrixBlock ret, int rl, int ru,
+ Pair<Integer, int[]> v) {
+ _groups = groups;
+ _vect = vect;
+ _ret = ret;
+ _rl = rl;
+ _ru = ru;
+ _v = v;
+ }
+
+ @Override
+ public Long call() {
+ try {
+ rightMultByVector(_groups, _vect, _ret, _rl,
_ru, _v);
+ return _ret.recomputeNonZeros(_rl, _ru - 1, 0,
0);
+ }
+ catch(Exception e) {
+ LOG.error(e);
+ throw new DMLRuntimeException(e);
+ }
+ }
+ }
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java
b/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java
index f555513..4f9020a 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java
@@ -47,148 +47,148 @@ import org.apache.sysds.runtime.util.CommonThreadPool;
public class LibScalar {
- // private static final Log LOG =
LogFactory.getLog(LibScalar.class.getName());
- private static final int MINIMUM_PARALLEL_SIZE = 8096;
-
- public static MatrixBlock scalarOperations(ScalarOperator sop,
CompressedMatrixBlock m1, CompressedMatrixBlock ret,
- boolean overlapping) {
- // LOG.error(sop);
- if(sop instanceof LeftScalarOperator) {
- if(sop.fn instanceof Minus) {
- m1 = (CompressedMatrixBlock) scalarOperations(new
RightScalarOperator(Multiply.getMultiplyFnObject(),
- -1), m1, ret, overlapping);
- return scalarOperations(new
RightScalarOperator(Plus.getPlusFnObject(), sop.getConstant()),
- m1,
- ret,
- overlapping);
- }
- else if(sop.fn instanceof Power2) {
- throw new DMLCompressionException("Left Power does not make
sense.");
- // List<ColGroup> newColGroups = new ArrayList<>();
- // double v = sop.executeScalar(0);
-
- // double[] values = new double[m1.getNumColumns()];
- // Arrays.fill(values, v);
-
- // int[] colIndexes = new int[m1.getNumColumns()];
- // for(int i = 0; i < colIndexes.length; i++) {
- // colIndexes[i] = i;
- // }
- // newColGroups.add(new ColGroupConst(colIndexes,
ret.getNumRows(), new Dictionary(values)));
- // ret.allocateColGroupList(newColGroups);
- // ret.setNonZeros(ret.getNumColumns() * ret.getNumRows());
- // return ret;
- }
-
- }
-
- List<ColGroup> colGroups = m1.getColGroups();
- if(overlapping && !(sop.fn instanceof Multiply)) {
- if(sop.fn instanceof Plus || sop.fn instanceof Minus) {
-
- // If the colGroup is overlapping we know there are no
incompressable colGroups.
- List<ColGroup> newColGroups = new ArrayList<>();
- for(ColGroup grp : colGroups) {
- ColGroupValue g = (ColGroupValue) grp;
- newColGroups.add(g.copy());
- }
- int[] colIndexes = newColGroups.get(0).getColIndices();
- double v = sop.executeScalar(0);
- double[] values = new double[colIndexes.length];
- Arrays.fill(values, v);
- newColGroups.add(new ColGroupConst(colIndexes,
ret.getNumRows(), new Dictionary(values)));
- ret.allocateColGroupList(newColGroups);
- ret.setOverlapping(true);
- ret.setNonZeros(-1);
- }
- }
- else {
-
- if(sop.getNumThreads() > 1) {
- parallelScalarOperations(sop, colGroups, ret,
sop.getNumThreads());
- }
- else {
- // Apply the operation to each of the column groups.
- // Most implementations will only modify metadata.
- List<ColGroup> newColGroups = new ArrayList<>();
- for(ColGroup grp : colGroups) {
- newColGroups.add(grp.scalarOperation(sop));
- }
- ret.allocateColGroupList(newColGroups);
- }
- ret.setNonZeros(-1);
- ret.setOverlapping(m1.isOverlapping());
- }
-
- return ret;
-
- }
-
- private static void parallelScalarOperations(ScalarOperator sop,
List<ColGroup> colGroups,
- CompressedMatrixBlock ret, int k) {
- ExecutorService pool = CommonThreadPool.get(k);
- List<ScalarTask> tasks = partition(sop, colGroups);
- try {
- List<Future<List<ColGroup>>> rtasks = pool.invokeAll(tasks);
- pool.shutdown();
- List<ColGroup> newColGroups = new ArrayList<>();
- for(Future<List<ColGroup>> f : rtasks) {
- newColGroups.addAll(f.get());
- }
- ret.allocateColGroupList(newColGroups);
- }
- catch(InterruptedException | ExecutionException e) {
- throw new DMLRuntimeException(e);
- }
- }
-
- private static List<ScalarTask> partition(ScalarOperator sop,
List<ColGroup> colGroups) {
- ArrayList<ScalarTask> tasks = new ArrayList<>();
- ArrayList<ColGroup> small = new ArrayList<>();
- for(ColGroup grp : colGroups) {
- if(grp instanceof ColGroupUncompressed) {
- ArrayList<ColGroup> uc = new ArrayList<>();
- uc.add(grp);
- tasks.add(new ScalarTask(uc, sop));
- }
- else {
- int nv = ((ColGroupValue) grp).getNumValues() *
grp.getColIndices().length;
- if(nv < MINIMUM_PARALLEL_SIZE) {
- small.add(grp);
- }
- else {
- ArrayList<ColGroup> large = new ArrayList<>();
- large.add(grp);
- tasks.add(new ScalarTask(large, sop));
- }
- }
- if(small.size() > 10) {
- tasks.add(new ScalarTask(small, sop));
- small = new ArrayList<>();
- }
- }
- if(small.size() > 0) {
- tasks.add(new ScalarTask(small, sop));
- }
- return tasks;
- }
-
- private static class ScalarTask implements Callable<List<ColGroup>> {
- private final List<ColGroup> _colGroups;
- private final ScalarOperator _sop;
-
- protected ScalarTask(List<ColGroup> colGroups, ScalarOperator sop) {
- _colGroups = colGroups;
- _sop = sop;
- }
-
- @Override
- public List<ColGroup> call() {
- List<ColGroup> res = new ArrayList<>();
- for(ColGroup x : _colGroups) {
- res.add(x.scalarOperation(_sop));
- }
- return res;
- }
- }
+ // private static final Log LOG =
LogFactory.getLog(LibScalar.class.getName());
+ private static final int MINIMUM_PARALLEL_SIZE = 8096;
+
+ public static MatrixBlock scalarOperations(ScalarOperator sop,
CompressedMatrixBlock m1,
+ CompressedMatrixBlock ret, boolean overlapping)
+ {
+ if(sop instanceof LeftScalarOperator) {
+ if(sop.fn instanceof Minus) {
+ m1 = (CompressedMatrixBlock)
scalarOperations(new RightScalarOperator(Multiply.getMultiplyFnObject(),
+ -1), m1, ret, overlapping);
+ return scalarOperations(new
RightScalarOperator(Plus.getPlusFnObject(), sop.getConstant()),
+ m1,
+ ret,
+ overlapping);
+ }
+ else if(sop.fn instanceof Power2) {
+ throw new DMLCompressionException("Left Power
does not make sense.");
+ // List<ColGroup> newColGroups = new
ArrayList<>();
+ // double v = sop.executeScalar(0);
+
+ // double[] values = new
double[m1.getNumColumns()];
+ // Arrays.fill(values, v);
+
+ // int[] colIndexes = new
int[m1.getNumColumns()];
+ // for(int i = 0; i < colIndexes.length; i++) {
+ // colIndexes[i] = i;
+ // }
+ // newColGroups.add(new
ColGroupConst(colIndexes, ret.getNumRows(), new Dictionary(values)));
+ // ret.allocateColGroupList(newColGroups);
+ // ret.setNonZeros(ret.getNumColumns() *
ret.getNumRows());
+ // return ret;
+ }
+
+ }
+
+ List<ColGroup> colGroups = m1.getColGroups();
+ if(overlapping && !(sop.fn instanceof Multiply)) {
+ if(sop.fn instanceof Plus || sop.fn instanceof Minus) {
+
+ // If the colGroup is overlapping we know there
are no incompressable colGroups.
+ List<ColGroup> newColGroups = new ArrayList<>();
+ for(ColGroup grp : colGroups) {
+ ColGroupValue g = (ColGroupValue) grp;
+ newColGroups.add(g.copy());
+ }
+ int[] colIndexes =
newColGroups.get(0).getColIndices();
+ double v = sop.executeScalar(0);
+ double[] values = new double[colIndexes.length];
+ Arrays.fill(values, v);
+ newColGroups.add(new ColGroupConst(colIndexes,
ret.getNumRows(), new Dictionary(values)));
+ ret.allocateColGroupList(newColGroups);
+ ret.setOverlapping(true);
+ ret.setNonZeros(-1);
+ }
+ }
+ else {
+
+ if(sop.getNumThreads() > 1) {
+ parallelScalarOperations(sop, colGroups, ret,
sop.getNumThreads());
+ }
+ else {
+ // Apply the operation to each of the column
groups.
+ // Most implementations will only modify
metadata.
+ List<ColGroup> newColGroups = new ArrayList<>();
+ for(ColGroup grp : colGroups) {
+
newColGroups.add(grp.scalarOperation(sop));
+ }
+ ret.allocateColGroupList(newColGroups);
+ }
+ ret.setNonZeros(-1);
+ ret.setOverlapping(m1.isOverlapping());
+ }
+
+ return ret;
+
+ }
+
+ private static void parallelScalarOperations(ScalarOperator sop,
List<ColGroup> colGroups,
+ CompressedMatrixBlock ret, int k) {
+ ExecutorService pool = CommonThreadPool.get(k);
+ List<ScalarTask> tasks = partition(sop, colGroups);
+ try {
+ List<Future<List<ColGroup>>> rtasks =
pool.invokeAll(tasks);
+ pool.shutdown();
+ List<ColGroup> newColGroups = new ArrayList<>();
+ for(Future<List<ColGroup>> f : rtasks) {
+ newColGroups.addAll(f.get());
+ }
+ ret.allocateColGroupList(newColGroups);
+ }
+ catch(InterruptedException | ExecutionException e) {
+ throw new DMLRuntimeException(e);
+ }
+ }
+
+ private static List<ScalarTask> partition(ScalarOperator sop,
List<ColGroup> colGroups) {
+ ArrayList<ScalarTask> tasks = new ArrayList<>();
+ ArrayList<ColGroup> small = new ArrayList<>();
+ for(ColGroup grp : colGroups) {
+ if(grp instanceof ColGroupUncompressed) {
+ ArrayList<ColGroup> uc = new ArrayList<>();
+ uc.add(grp);
+ tasks.add(new ScalarTask(uc, sop));
+ }
+ else {
+ int nv = ((ColGroupValue) grp).getNumValues() *
grp.getColIndices().length;
+ if(nv < MINIMUM_PARALLEL_SIZE) {
+ small.add(grp);
+ }
+ else {
+ ArrayList<ColGroup> large = new
ArrayList<>();
+ large.add(grp);
+ tasks.add(new ScalarTask(large, sop));
+ }
+ }
+ if(small.size() > 10) {
+ tasks.add(new ScalarTask(small, sop));
+ small = new ArrayList<>();
+ }
+ }
+ if(small.size() > 0) {
+ tasks.add(new ScalarTask(small, sop));
+ }
+ return tasks;
+ }
+
+ private static class ScalarTask implements Callable<List<ColGroup>> {
+ private final List<ColGroup> _colGroups;
+ private final ScalarOperator _sop;
+
+ protected ScalarTask(List<ColGroup> colGroups, ScalarOperator
sop) {
+ _colGroups = colGroups;
+ _sop = sop;
+ }
+
+ @Override
+ public List<ColGroup> call() {
+ List<ColGroup> res = new ArrayList<>();
+ for(ColGroup x : _colGroups) {
+ res.add(x.scalarOperation(_sop));
+ }
+ return res;
+ }
+ }
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java
index 8fa0698..80418ac 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java
@@ -52,6 +52,7 @@ import java.util.stream.Collectors;
import static org.apache.sysds.runtime.util.ProgramConverter.*;
public class FederatedPSControlThread extends PSWorker implements
Callable<Void> {
+ private static final long serialVersionUID = 6846648059569648791L;
FederatedData _featuresData;
FederatedData _labelsData;
final long _batchCounterVarID;
@@ -140,6 +141,7 @@ public class FederatedPSControlThread extends PSWorker
implements Callable<Void>
* Setup UDF executed on the federated worker
*/
private static class setupFederatedWorker extends FederatedUDF {
+ private static final long serialVersionUID =
-3148991224792675607L;
long _batchSize;
long _dataSize;
long _numBatches;
@@ -209,6 +211,8 @@ public class FederatedPSControlThread extends PSWorker
implements Callable<Void>
* Teardown UDF executed on the federated worker
*/
private static class teardownFederatedWorker extends FederatedUDF {
+ private static final long serialVersionUID =
-153650281873318969L;
+
protected teardownFederatedWorker() {
super(new long[]{});
}
@@ -326,6 +330,8 @@ public class FederatedPSControlThread extends PSWorker
implements Callable<Void>
* This is the code that will be executed on the federated Worker when
computing a single batch
*/
private static class federatedComputeBatchGradients extends
FederatedUDF {
+ private static final long serialVersionUID =
-3652112393963053475L;
+
protected federatedComputeBatchGradients(long[] inIDs) {
super(inIDs);
}
@@ -438,6 +444,8 @@ public class FederatedPSControlThread extends PSWorker
implements Callable<Void>
* This is the code that will be executed on the federated Worker when
computing one epoch
*/
private static class federatedComputeEpochGradients extends
FederatedUDF {
+ private static final long serialVersionUID =
-3075901536748794832L;
+
protected federatedComputeEpochGradients(long[] inIDs) {
super(inIDs);
}
diff --git
a/src/main/java/org/apache/sysds/runtime/privacy/finegrained/FineGrainedPrivacyList.java
b/src/main/java/org/apache/sysds/runtime/privacy/finegrained/FineGrainedPrivacyList.java
index 54cb639..2fb1297 100644
---
a/src/main/java/org/apache/sysds/runtime/privacy/finegrained/FineGrainedPrivacyList.java
+++
b/src/main/java/org/apache/sysds/runtime/privacy/finegrained/FineGrainedPrivacyList.java
@@ -165,21 +165,13 @@ public class FineGrainedPrivacyList implements
FineGrainedPrivacy {
}
private boolean listEquals(ArrayList<Map.Entry<DataRange,PrivacyLevel>>
otherFGP){
- if ( otherFGP.size() == constraintCollection.size() ){
- for ( Map.Entry<DataRange, PrivacyLevel> constraint :
constraintCollection){
- if ( !innerEquals(constraint, otherFGP) )
- return false;
- }
- return true;
- } else return false;
- }
-
- private boolean innerEquals(Map.Entry<DataRange, PrivacyLevel>
constraint, ArrayList<Map.Entry<DataRange,PrivacyLevel>> otherFGP){
- for (Map.Entry<DataRange, PrivacyLevel> otherConstraint :
otherFGP){
- if ( constraint.equals(otherConstraint) )
- return true;
+ if ( otherFGP.size() != constraintCollection.size() )
+ return false;
+ for ( Map.Entry<DataRange, PrivacyLevel> constraint :
constraintCollection){
+ if ( !otherFGP.contains(constraint) )
+ return false;
}
- return false;
+ return true;
}
@Override
diff --git
a/src/main/java/org/apache/sysds/runtime/privacy/propagation/PrivacyPropagator.java
b/src/main/java/org/apache/sysds/runtime/privacy/propagation/PrivacyPropagator.java
index 2776a49..49d17fa 100644
---
a/src/main/java/org/apache/sysds/runtime/privacy/propagation/PrivacyPropagator.java
+++
b/src/main/java/org/apache/sysds/runtime/privacy/propagation/PrivacyPropagator.java
@@ -22,7 +22,6 @@ package org.apache.sysds.runtime.privacy.propagation;
import java.util.*;
import org.apache.sysds.parser.DataExpression;
-import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.instructions.Instruction;
import org.apache.sysds.runtime.instructions.cp.*;
diff --git
a/src/test/java/org/apache/sysds/test/component/paramserv/SerializationTest.java
b/src/test/java/org/apache/sysds/test/component/paramserv/SerializationTest.java
index bf47f19..0fd172d 100644
---
a/src/test/java/org/apache/sysds/test/component/paramserv/SerializationTest.java
+++
b/src/test/java/org/apache/sysds/test/component/paramserv/SerializationTest.java
@@ -27,7 +27,6 @@ import java.io.ObjectInputStream;
import java.util.Arrays;
import java.util.Collection;
-import org.apache.sysds.runtime.DMLRuntimeException;
import org.junit.Assert;
import org.junit.Test;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
@@ -44,11 +43,8 @@ public class SerializationTest {
private int _named;
@Parameterized.Parameters
- public static Collection named() {
- return Arrays.asList(new Object[][] {
- { 0 },
- { 1 }
- });
+ public static Collection<?> named() {
+ return Arrays.asList(new Object[][] {{ 0 }, { 1 }});
}
public SerializationTest(Integer named) {
diff --git
a/src/test/java/org/apache/sysds/test/functions/privacy/ReadWriteTest.java
b/src/test/java/org/apache/sysds/test/functions/privacy/ReadWriteTest.java
index 6b0f941..dea5773 100644
--- a/src/test/java/org/apache/sysds/test/functions/privacy/ReadWriteTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/privacy/ReadWriteTest.java
@@ -133,7 +133,7 @@ public class ReadWriteTest extends AutomatedTestBase {
return a;
}
- private void setFineGrained(PrivacyConstraint privacyConstraint){
+ private static void setFineGrained(PrivacyConstraint privacyConstraint){
FineGrainedPrivacy fgp =
privacyConstraint.getFineGrainedPrivacy();
fgp.put(new DataRange(new long[]{1,2}, new long[]{5,4}),
PrivacyLevel.Private);
fgp.put(new DataRange(new long[]{7,1}, new long[]{9,1}),
PrivacyLevel.Private);
diff --git
a/src/test/java/org/apache/sysds/test/functions/privacy/propagation/AppendPropagatorTest.java
b/src/test/java/org/apache/sysds/test/functions/privacy/propagation/AppendPropagatorTest.java
index 560db59..41edec0 100644
---
a/src/test/java/org/apache/sysds/test/functions/privacy/propagation/AppendPropagatorTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/privacy/propagation/AppendPropagatorTest.java
@@ -19,24 +19,26 @@
package org.apache.sysds.test.functions.privacy.propagation;
-import org.apache.sysds.api.DMLScript;
-import org.apache.sysds.common.Types;
-import org.apache.sysds.parser.DataExpression;
-import org.apache.sysds.runtime.instructions.cp.*;
+import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.cp.DoubleObject;
+import org.apache.sysds.runtime.instructions.cp.IntObject;
+import org.apache.sysds.runtime.instructions.cp.ListObject;
+import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.privacy.PrivacyConstraint;
import org.apache.sysds.runtime.privacy.PrivacyConstraint.PrivacyLevel;
-import org.apache.sysds.runtime.privacy.PrivacyUtils;
import org.apache.sysds.runtime.privacy.finegrained.DataRange;
-import org.apache.sysds.runtime.privacy.finegrained.FineGrainedPrivacy;
-import org.apache.sysds.runtime.privacy.propagation.*;
+import org.apache.sysds.runtime.privacy.propagation.AppendPropagator;
+import org.apache.sysds.runtime.privacy.propagation.CBindPropagator;
+import org.apache.sysds.runtime.privacy.propagation.ListAppendPropagator;
+import org.apache.sysds.runtime.privacy.propagation.ListRemovePropagator;
+import org.apache.sysds.runtime.privacy.propagation.Propagator;
+import org.apache.sysds.runtime.privacy.propagation.PropagatorMultiReturn;
+import org.apache.sysds.runtime.privacy.propagation.RBindPropagator;
import org.apache.sysds.test.AutomatedTestBase;
import org.apache.sysds.test.TestConfiguration;
import org.apache.sysds.test.TestUtils;
-import
org.apache.sysds.test.functions.federated.primitives.FederatedRCBindTest;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@@ -46,8 +48,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import static org.junit.Assert.assertEquals;
-
public class AppendPropagatorTest extends AutomatedTestBase {
private final static String TEST_DIR = "functions/privacy/";
@@ -100,23 +100,6 @@ public class AppendPropagatorTest extends
AutomatedTestBase {
generalOnlyRBindTest(new
PrivacyConstraint(PrivacyLevel.Private), new
PrivacyConstraint(PrivacyLevel.PrivateAggregation));
}
- private void generalOnlyRBindTest(PrivacyConstraint constraint1,
PrivacyConstraint constraint2){
- int columns = 2;
- int rows1 = 4;
- int rows2 = 3;
- MatrixBlock inputMatrix1 = new MatrixBlock(rows1,columns,3);
- MatrixBlock inputMatrix2 = new MatrixBlock(rows2,columns,4);
- AppendPropagator propagator = new RBindPropagator(inputMatrix1,
constraint1, inputMatrix2, constraint2);
- PrivacyConstraint mergedConstraint = propagator.propagate();
- Assert.assertEquals(mergedConstraint.getPrivacyLevel(),
PrivacyLevel.None);
- Map<DataRange, PrivacyLevel> firstHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
- new DataRange(new long[]{0,0}, new
long[]{rows1-1,columns-1}));
- firstHalfPrivacy.forEach((range,level) ->
Assert.assertEquals(constraint1.getPrivacyLevel(),level));
- Map<DataRange, PrivacyLevel> secondHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
- new DataRange(new long[]{rows1,0}, new
long[]{rows1+rows2-1,columns-1}));
- secondHalfPrivacy.forEach((range,level) ->
Assert.assertEquals(constraint2.getPrivacyLevel(),level));
- }
-
@Test
public void generalOnlyCBindPrivate1Test(){
generalOnlyCBindTest(new
PrivacyConstraint(PrivacyLevel.Private), new PrivacyConstraint());
@@ -152,23 +135,6 @@ public class AppendPropagatorTest extends
AutomatedTestBase {
generalOnlyCBindTest(new
PrivacyConstraint(PrivacyLevel.Private), new
PrivacyConstraint(PrivacyLevel.PrivateAggregation));
}
- private void generalOnlyCBindTest(PrivacyConstraint constraint1,
PrivacyConstraint constraint2){
- int rows = 2;
- int columns1 = 4;
- int columns2 = 3;
- MatrixBlock inputMatrix1 = new MatrixBlock(rows,columns1,3);
- MatrixBlock inputMatrix2 = new MatrixBlock(rows,columns2,4);
- AppendPropagator propagator = new CBindPropagator(inputMatrix1,
constraint1, inputMatrix2, constraint2);
- PrivacyConstraint mergedConstraint = propagator.propagate();
- Assert.assertEquals(mergedConstraint.getPrivacyLevel(),
PrivacyLevel.None);
- Map<DataRange, PrivacyLevel> firstHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
- new DataRange(new long[]{0,0}, new
long[]{rows-1,columns1-1}));
- firstHalfPrivacy.forEach((range,level) ->
Assert.assertEquals(constraint1.getPrivacyLevel(),level));
- Map<DataRange, PrivacyLevel> secondHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
- new DataRange(new long[]{0,columns1}, new
long[]{rows,columns1+columns2-1}));
- secondHalfPrivacy.forEach((range,level) ->
Assert.assertEquals(constraint2.getPrivacyLevel(),level));
- }
-
@Test
public void generalOnlyListAppendPrivate1Test(){
generalOnlyListAppendTest(new
PrivacyConstraint(PrivacyLevel.Private), new PrivacyConstraint());
@@ -204,25 +170,6 @@ public class AppendPropagatorTest extends
AutomatedTestBase {
generalOnlyListAppendTest(new
PrivacyConstraint(PrivacyLevel.Private), new
PrivacyConstraint(PrivacyLevel.PrivateAggregation));
}
- private void generalOnlyListAppendTest(PrivacyConstraint constraint1,
PrivacyConstraint constraint2){
- int length1 = 6;
- List<Data> dataList1 = Arrays.asList(new Data[length1]);
- ListObject input1 = new ListObject(dataList1);
- int length2 = 11;
- List<Data> dataList2 = Arrays.asList(new Data[length2]);
- ListObject input2 = new ListObject(dataList2);
- Propagator propagator = new ListAppendPropagator(input1,
constraint1, input2, constraint2);
- PrivacyConstraint mergedConstraint = propagator.propagate();
- Map<DataRange, PrivacyLevel> firstHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
- new DataRange(new long[]{0}, new long[]{length1-1})
- );
- firstHalfPrivacy.forEach((range,level) ->
Assert.assertEquals(constraint1.getPrivacyLevel(),level));
- Map<DataRange, PrivacyLevel> secondHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
- new DataRange(new long[length1], new
long[]{length1+length2-1})
- );
- secondHalfPrivacy.forEach((range,level) ->
Assert.assertEquals(constraint2.getPrivacyLevel(),level));
- }
-
@Test
public void generalOnlyListRemoveAppendPrivate1Test(){
generalOnlyListRemoveAppendTest(new
PrivacyConstraint(PrivacyLevel.Private), new PrivacyConstraint(),
@@ -265,27 +212,6 @@ public class AppendPropagatorTest extends
AutomatedTestBase {
PrivacyLevel.Private, PrivacyLevel.Private);
}
- private void generalOnlyListRemoveAppendTest(
- PrivacyConstraint constraint1, PrivacyConstraint constraint2,
PrivacyLevel expected1, PrivacyLevel expected2){
- int dataLength = 9;
- List<Data> dataList = new ArrayList<>();
- for ( int i = 0; i < dataLength; i++){
- dataList.add(new DoubleObject(i));
- }
- ListObject inputList = new ListObject(dataList);
-
- int removePositionInt = 5;
- ScalarObject removePosition = new IntObject(removePositionInt);
-
- PropagatorMultiReturn propagator = new
ListRemovePropagator(inputList, constraint1, removePosition, constraint2);
- PrivacyConstraint[] mergedConstraints = propagator.propagate();
-
- Assert.assertEquals(expected1,
mergedConstraints[0].getPrivacyLevel());
- Assert.assertEquals(expected2,
mergedConstraints[1].getPrivacyLevel());
- Assert.assertFalse("The first output constraint should have no
fine-grained constraints", mergedConstraints[0].hasFineGrainedConstraints());
- Assert.assertFalse("The second output constraint should have no
fine-grained constraints", mergedConstraints[1].hasFineGrainedConstraints());
- }
-
@Test
public void finegrainedRBindPrivate1(){
PrivacyConstraint constraint1 = new PrivacyConstraint();
@@ -339,30 +265,7 @@ public class AppendPropagatorTest extends
AutomatedTestBase {
constraint2.getFineGrainedPrivacy().put(new DataRange(new
long[]{1,0},new long[]{2,0}), PrivacyLevel.PrivateAggregation);
finegrainedRBindTest(constraint1, constraint2);
}
-
- private void finegrainedRBindTest(PrivacyConstraint constraint1,
PrivacyConstraint constraint2){
- int columns = 2;
- int rows1 = 4;
- int rows2 = 3;
- MatrixBlock inputMatrix1 = new MatrixBlock(rows1,columns,3);
- MatrixBlock inputMatrix2 = new MatrixBlock(rows2,columns,4);
- AppendPropagator propagator = new RBindPropagator(inputMatrix1,
constraint1, inputMatrix2, constraint2);
- PrivacyConstraint mergedConstraint = propagator.propagate();
- Assert.assertEquals(mergedConstraint.getPrivacyLevel(),
PrivacyLevel.None);
- Map<DataRange, PrivacyLevel> firstHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
- new DataRange(new long[]{0,0}, new
long[]{rows1-1,columns-1}));
-
constraint1.getFineGrainedPrivacy().getAllConstraintsList().forEach(
- constraint -> Assert.assertTrue("Merged constraint
should contain same privacy levels as input 1",
-
firstHalfPrivacy.containsValue(constraint.getValue()))
- );
- Map<DataRange, PrivacyLevel> secondHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
- new DataRange(new long[]{rows1,0}, new
long[]{rows1+rows2-1,columns-1}));
-
constraint2.getFineGrainedPrivacy().getAllConstraintsList().forEach(
- constraint -> Assert.assertTrue("Merged constraint
should contain same privacy levels as input 2",
-
secondHalfPrivacy.containsValue(constraint.getValue()))
- );
- }
-
+
@Test
public void finegrainedCBindPrivate1(){
PrivacyConstraint constraint1 = new PrivacyConstraint();
@@ -417,29 +320,6 @@ public class AppendPropagatorTest extends
AutomatedTestBase {
finegrainedCBindTest(constraint1, constraint2);
}
- private void finegrainedCBindTest(PrivacyConstraint constraint1,
PrivacyConstraint constraint2){
- int rows = 6;
- int columns1 = 4;
- int columns2 = 3;
- MatrixBlock inputMatrix1 = new MatrixBlock(rows,columns1,3);
- MatrixBlock inputMatrix2 = new MatrixBlock(rows,columns2,4);
- AppendPropagator propagator = new CBindPropagator(inputMatrix1,
constraint1, inputMatrix2, constraint2);
- PrivacyConstraint mergedConstraint = propagator.propagate();
- Assert.assertEquals(mergedConstraint.getPrivacyLevel(),
PrivacyLevel.None);
- Map<DataRange, PrivacyLevel> firstHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
- new DataRange(new long[]{0,0}, new
long[]{rows-1,columns1-1}));
-
constraint1.getFineGrainedPrivacy().getAllConstraintsList().forEach(
- constraint -> Assert.assertTrue("Merged constraint
should contain same privacy levels as input 1",
-
firstHalfPrivacy.containsValue(constraint.getValue()))
- );
- Map<DataRange, PrivacyLevel> secondHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
- new DataRange(new long[]{0,columns1}, new
long[]{rows,columns1+columns2-1}));
-
constraint2.getFineGrainedPrivacy().getAllConstraintsList().forEach(
- constraint -> Assert.assertTrue("Merged constraint
should contain same privacy levels as input 2",
-
secondHalfPrivacy.containsValue(constraint.getValue()))
- );
- }
-
@Test
public void finegrainedListAppendPrivate1(){
PrivacyConstraint constraint1 = new PrivacyConstraint();
@@ -494,39 +374,12 @@ public class AppendPropagatorTest extends
AutomatedTestBase {
finegrainedListAppendTest(constraint1, constraint2);
}
- private void finegrainedListAppendTest(PrivacyConstraint constraint1,
PrivacyConstraint constraint2){
- int length1 = 6;
- List<Data> dataList1 = Arrays.asList(new Data[length1]);
- ListObject input1 = new ListObject(dataList1);
- int length2 = 11;
- List<Data> dataList2 = Arrays.asList(new Data[length2]);
- ListObject input2 = new ListObject(dataList2);
- Propagator propagator = new ListAppendPropagator(input1,
constraint1, input2, constraint2);
- PrivacyConstraint mergedConstraint = propagator.propagate();
- Assert.assertEquals(mergedConstraint.getPrivacyLevel(),
PrivacyLevel.None);
- Map<DataRange, PrivacyLevel> firstHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
- new DataRange(new long[]{0}, new long[]{length1-1})
- );
-
constraint1.getFineGrainedPrivacy().getAllConstraintsList().forEach(
- constraint -> Assert.assertTrue("Merged constraint
should contain same privacy levels as input 1",
-
firstHalfPrivacy.containsValue(constraint.getValue()))
- );
- Map<DataRange, PrivacyLevel> secondHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
- new DataRange(new long[]{length1}, new
long[]{length1+length2-1})
- );
-
constraint2.getFineGrainedPrivacy().getAllConstraintsList().forEach(
- constraint -> Assert.assertTrue("Merged constraint
should contain same privacy levels as input 2",
-
secondHalfPrivacy.containsValue(constraint.getValue()))
- );
- }
-
@Test
public void testFunction(){
int dataLength = 9;
List<Data> dataList = new ArrayList<>();
- for ( int i = 0; i < dataLength; i++){
+ for ( int i = 0; i < dataLength; i++)
dataList.add(new DoubleObject(i));
- }
ListObject l = new ListObject(dataList);
ListObject lCopy = l.copy();
int position = 4;
@@ -591,38 +444,6 @@ public class AppendPropagatorTest extends
AutomatedTestBase {
finegrainedListRemoveAppendTest(constraint1, constraint2,
PrivacyLevel.PrivateAggregation);
}
- private void finegrainedListRemoveAppendTest(
- PrivacyConstraint constraint1, PrivacyConstraint constraint2,
PrivacyLevel expectedOutput2){
- finegrainedListRemoveAppendTest(constraint1, constraint2,
expectedOutput2, false);
- }
-
- private void finegrainedListRemoveAppendTest(
- PrivacyConstraint constraint1, PrivacyConstraint constraint2,
PrivacyLevel expectedOutput2, boolean singleElementPrivacy){
- int dataLength = 9;
- List<Data> dataList = new ArrayList<>();
- for ( int i = 0; i < dataLength; i++){
- dataList.add(new DoubleObject(i));
- }
- ListObject inputList = new ListObject(dataList);
- int removePositionInt = 5;
- ScalarObject removePosition = new IntObject(removePositionInt);
- PropagatorMultiReturn propagator = new
ListRemovePropagator(inputList, constraint1, removePosition, constraint2);
- PrivacyConstraint[] mergedConstraints = propagator.propagate();
-
- if ( !singleElementPrivacy ){
- Map<DataRange, PrivacyLevel> outputPrivacy =
mergedConstraints[0].getFineGrainedPrivacy().getPrivacyLevel(
- new DataRange(new long[]{0}, new
long[]{dataLength-1})
- );
-
constraint1.getFineGrainedPrivacy().getAllConstraintsList().forEach(
- constraint -> Assert.assertTrue("Merged
constraint should contain same privacy levels as input 1",
-
outputPrivacy.containsValue(constraint.getValue()))
- );
- }
-
- Assert.assertEquals(expectedOutput2,
mergedConstraints[1].getPrivacyLevel());
-
Assert.assertFalse(mergedConstraints[1].hasFineGrainedConstraints());
- }
-
@Test
public void integrationRBindTestNoneNone(){
PrivacyConstraint pc1 = new
PrivacyConstraint(PrivacyLevel.None);
@@ -865,26 +686,6 @@ public class AppendPropagatorTest extends
AutomatedTestBase {
integrationCBindTest(constraint1, constraint2, pcExpected);
}
- private void integrationCBindTest(PrivacyConstraint privacyConstraint1,
PrivacyConstraint privacyConstraint2,
- PrivacyConstraint expectedOutput){
- TestConfiguration config =
getAndLoadTestConfiguration(TEST_NAME_CBIND);
- fullDMLScriptName = SCRIPT_DIR + TEST_DIR +
config.getTestScript() + ".dml";
-
- int cols1 = 20;
- int cols2 = 30;
- int rows = 10;
- double[][] A = getRandomMatrix(rows, cols1, -10, 10, 0.5, 1);
- double[][] B = getRandomMatrix(rows, cols2, -10, 10, 0.5, 1);
- writeInputMatrixWithMTD("A", A, false, new
MatrixCharacteristics(rows, cols1), privacyConstraint1);
- writeInputMatrixWithMTD("B", B, false, new
MatrixCharacteristics(rows, cols2), privacyConstraint2);
-
- programArgs = new String[]{"-nvargs", "A=" + input("A"), "B=" +
input("B"), "C=" + output("C")};
- runTest(true,false,null,-1);
-
- PrivacyConstraint outputConstraint =
getPrivacyConstraintFromMetaData("C");
- Assert.assertEquals(expectedOutput, outputConstraint);
- }
-
@Test
public void integrationStringAppendTestNoneNone(){
PrivacyConstraint pc1 = new
PrivacyConstraint(PrivacyLevel.None);
@@ -920,25 +721,6 @@ public class AppendPropagatorTest extends
AutomatedTestBase {
integrationStringAppendTest(pc1, pc2, pc2);
}
- private void integrationStringAppendTest(PrivacyConstraint
privacyConstraint1, PrivacyConstraint privacyConstraint2,
- PrivacyConstraint expectedOutput){
- TestConfiguration config =
getAndLoadTestConfiguration(TEST_NAME_STRING);
- fullDMLScriptName = SCRIPT_DIR + TEST_DIR +
config.getTestScript() + ".dml";
-
- int cols = 1;
- int rows = 1;
- double[][] A = getRandomMatrix(rows, cols, -10, 10, 0.5, 1);
- double[][] B = getRandomMatrix(rows, cols, -10, 10, 0.5, 1);
- writeInputMatrixWithMTD("A", A, false, new
MatrixCharacteristics(rows, cols), privacyConstraint1);
- writeInputMatrixWithMTD("B", B, false, new
MatrixCharacteristics(rows, cols), privacyConstraint2);
-
- programArgs = new String[]{"-nvargs", "A=" + input("A"), "B=" +
input("B"), "C=" + output("C")};
- runTest(true,false,null,-1);
-
- PrivacyConstraint outputConstraint =
getPrivacyConstraintFromMetaData("C");
- Assert.assertEquals(expectedOutput, outputConstraint);
- }
-
@Ignore
@Test
public void integrationListAppendTestNoneNone(){
@@ -981,6 +763,223 @@ public class AppendPropagatorTest extends
AutomatedTestBase {
integrationListAppendTest(pc1, pc2, pc2);
}
+ private static void generalOnlyRBindTest(PrivacyConstraint constraint1,
PrivacyConstraint constraint2){
+ int columns = 2;
+ int rows1 = 4;
+ int rows2 = 3;
+ MatrixBlock inputMatrix1 = new MatrixBlock(rows1,columns,3);
+ MatrixBlock inputMatrix2 = new MatrixBlock(rows2,columns,4);
+ AppendPropagator propagator = new RBindPropagator(inputMatrix1,
constraint1, inputMatrix2, constraint2);
+ PrivacyConstraint mergedConstraint = propagator.propagate();
+ Assert.assertEquals(mergedConstraint.getPrivacyLevel(),
PrivacyLevel.None);
+ Map<DataRange, PrivacyLevel> firstHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
+ new DataRange(new long[]{0,0}, new
long[]{rows1-1,columns-1}));
+ firstHalfPrivacy.forEach((range,level) ->
Assert.assertEquals(constraint1.getPrivacyLevel(),level));
+ Map<DataRange, PrivacyLevel> secondHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
+ new DataRange(new long[]{rows1,0}, new
long[]{rows1+rows2-1,columns-1}));
+ secondHalfPrivacy.forEach((range,level) ->
Assert.assertEquals(constraint2.getPrivacyLevel(),level));
+ }
+
+ private static void generalOnlyCBindTest(PrivacyConstraint constraint1,
PrivacyConstraint constraint2){
+ int rows = 2;
+ int columns1 = 4;
+ int columns2 = 3;
+ MatrixBlock inputMatrix1 = new MatrixBlock(rows,columns1,3);
+ MatrixBlock inputMatrix2 = new MatrixBlock(rows,columns2,4);
+ AppendPropagator propagator = new CBindPropagator(inputMatrix1,
constraint1, inputMatrix2, constraint2);
+ PrivacyConstraint mergedConstraint = propagator.propagate();
+ Assert.assertEquals(mergedConstraint.getPrivacyLevel(),
PrivacyLevel.None);
+ Map<DataRange, PrivacyLevel> firstHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
+ new DataRange(new long[]{0,0}, new
long[]{rows-1,columns1-1}));
+ firstHalfPrivacy.forEach((range,level) ->
Assert.assertEquals(constraint1.getPrivacyLevel(),level));
+ Map<DataRange, PrivacyLevel> secondHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
+ new DataRange(new long[]{0,columns1}, new
long[]{rows,columns1+columns2-1}));
+ secondHalfPrivacy.forEach((range,level) ->
Assert.assertEquals(constraint2.getPrivacyLevel(),level));
+ }
+
+ private static void generalOnlyListAppendTest(PrivacyConstraint
constraint1, PrivacyConstraint constraint2){
+ int length1 = 6;
+ List<Data> dataList1 = Arrays.asList(new Data[length1]);
+ ListObject input1 = new ListObject(dataList1);
+ int length2 = 11;
+ List<Data> dataList2 = Arrays.asList(new Data[length2]);
+ ListObject input2 = new ListObject(dataList2);
+ Propagator propagator = new ListAppendPropagator(input1,
constraint1, input2, constraint2);
+ PrivacyConstraint mergedConstraint = propagator.propagate();
+ Map<DataRange, PrivacyLevel> firstHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
+ new DataRange(new long[]{0}, new long[]{length1-1})
+ );
+ firstHalfPrivacy.forEach((range,level) ->
Assert.assertEquals(constraint1.getPrivacyLevel(),level));
+ Map<DataRange, PrivacyLevel> secondHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
+ new DataRange(new long[length1], new
long[]{length1+length2-1})
+ );
+ secondHalfPrivacy.forEach((range,level) ->
Assert.assertEquals(constraint2.getPrivacyLevel(),level));
+ }
+
+ private static void generalOnlyListRemoveAppendTest(
+ PrivacyConstraint constraint1, PrivacyConstraint constraint2,
PrivacyLevel expected1, PrivacyLevel expected2){
+ int dataLength = 9;
+ List<Data> dataList = new ArrayList<>();
+ for ( int i = 0; i < dataLength; i++){
+ dataList.add(new DoubleObject(i));
+ }
+ ListObject inputList = new ListObject(dataList);
+
+ int removePositionInt = 5;
+ ScalarObject removePosition = new IntObject(removePositionInt);
+
+ PropagatorMultiReturn propagator = new
ListRemovePropagator(inputList, constraint1, removePosition, constraint2);
+ PrivacyConstraint[] mergedConstraints = propagator.propagate();
+
+ Assert.assertEquals(expected1,
mergedConstraints[0].getPrivacyLevel());
+ Assert.assertEquals(expected2,
mergedConstraints[1].getPrivacyLevel());
+ Assert.assertFalse("The first output constraint should have no
fine-grained constraints", mergedConstraints[0].hasFineGrainedConstraints());
+ Assert.assertFalse("The second output constraint should have no
fine-grained constraints", mergedConstraints[1].hasFineGrainedConstraints());
+ }
+
+ private static void finegrainedRBindTest(PrivacyConstraint constraint1,
PrivacyConstraint constraint2){
+ int columns = 2;
+ int rows1 = 4;
+ int rows2 = 3;
+ MatrixBlock inputMatrix1 = new MatrixBlock(rows1,columns,3);
+ MatrixBlock inputMatrix2 = new MatrixBlock(rows2,columns,4);
+ AppendPropagator propagator = new RBindPropagator(inputMatrix1,
constraint1, inputMatrix2, constraint2);
+ PrivacyConstraint mergedConstraint = propagator.propagate();
+ Assert.assertEquals(mergedConstraint.getPrivacyLevel(),
PrivacyLevel.None);
+ Map<DataRange, PrivacyLevel> firstHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
+ new DataRange(new long[]{0,0}, new
long[]{rows1-1,columns-1}));
+
constraint1.getFineGrainedPrivacy().getAllConstraintsList().forEach(
+ constraint -> Assert.assertTrue("Merged constraint
should contain same privacy levels as input 1",
+
firstHalfPrivacy.containsValue(constraint.getValue()))
+ );
+ Map<DataRange, PrivacyLevel> secondHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
+ new DataRange(new long[]{rows1,0}, new
long[]{rows1+rows2-1,columns-1}));
+
constraint2.getFineGrainedPrivacy().getAllConstraintsList().forEach(
+ constraint -> Assert.assertTrue("Merged constraint
should contain same privacy levels as input 2",
+
secondHalfPrivacy.containsValue(constraint.getValue()))
+ );
+ }
+
+ private static void finegrainedCBindTest(PrivacyConstraint constraint1,
PrivacyConstraint constraint2){
+ int rows = 6;
+ int columns1 = 4;
+ int columns2 = 3;
+ MatrixBlock inputMatrix1 = new MatrixBlock(rows,columns1,3);
+ MatrixBlock inputMatrix2 = new MatrixBlock(rows,columns2,4);
+ AppendPropagator propagator = new CBindPropagator(inputMatrix1,
constraint1, inputMatrix2, constraint2);
+ PrivacyConstraint mergedConstraint = propagator.propagate();
+ Assert.assertEquals(mergedConstraint.getPrivacyLevel(),
PrivacyLevel.None);
+ Map<DataRange, PrivacyLevel> firstHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
+ new DataRange(new long[]{0,0}, new
long[]{rows-1,columns1-1}));
+
constraint1.getFineGrainedPrivacy().getAllConstraintsList().forEach(
+ constraint -> Assert.assertTrue("Merged constraint
should contain same privacy levels as input 1",
+
firstHalfPrivacy.containsValue(constraint.getValue()))
+ );
+ Map<DataRange, PrivacyLevel> secondHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
+ new DataRange(new long[]{0,columns1}, new
long[]{rows,columns1+columns2-1}));
+
constraint2.getFineGrainedPrivacy().getAllConstraintsList().forEach(
+ constraint -> Assert.assertTrue("Merged constraint
should contain same privacy levels as input 2",
+
secondHalfPrivacy.containsValue(constraint.getValue()))
+ );
+ }
+
+ private static void finegrainedListAppendTest(PrivacyConstraint
constraint1, PrivacyConstraint constraint2){
+ int length1 = 6;
+ List<Data> dataList1 = Arrays.asList(new Data[length1]);
+ ListObject input1 = new ListObject(dataList1);
+ int length2 = 11;
+ List<Data> dataList2 = Arrays.asList(new Data[length2]);
+ ListObject input2 = new ListObject(dataList2);
+ Propagator propagator = new ListAppendPropagator(input1,
constraint1, input2, constraint2);
+ PrivacyConstraint mergedConstraint = propagator.propagate();
+ Assert.assertEquals(mergedConstraint.getPrivacyLevel(),
PrivacyLevel.None);
+ Map<DataRange, PrivacyLevel> firstHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
+ new DataRange(new long[]{0}, new long[]{length1-1})
+ );
+
constraint1.getFineGrainedPrivacy().getAllConstraintsList().forEach(
+ constraint -> Assert.assertTrue("Merged constraint
should contain same privacy levels as input 1",
+
firstHalfPrivacy.containsValue(constraint.getValue()))
+ );
+ Map<DataRange, PrivacyLevel> secondHalfPrivacy =
mergedConstraint.getFineGrainedPrivacy().getPrivacyLevel(
+ new DataRange(new long[]{length1}, new
long[]{length1+length2-1})
+ );
+
constraint2.getFineGrainedPrivacy().getAllConstraintsList().forEach(
+ constraint -> Assert.assertTrue("Merged constraint
should contain same privacy levels as input 2",
+
secondHalfPrivacy.containsValue(constraint.getValue()))
+ );
+ }
+
+ private static void finegrainedListRemoveAppendTest(
+ PrivacyConstraint constraint1, PrivacyConstraint constraint2,
PrivacyLevel expectedOutput2){
+ finegrainedListRemoveAppendTest(constraint1, constraint2,
expectedOutput2, false);
+ }
+
+ private static void finegrainedListRemoveAppendTest(
+ PrivacyConstraint constraint1, PrivacyConstraint constraint2,
PrivacyLevel expectedOutput2, boolean singleElementPrivacy){
+ int dataLength = 9;
+ List<Data> dataList = new ArrayList<>();
+ for ( int i = 0; i < dataLength; i++){
+ dataList.add(new DoubleObject(i));
+ }
+ ListObject inputList = new ListObject(dataList);
+ int removePositionInt = 5;
+ ScalarObject removePosition = new IntObject(removePositionInt);
+ PropagatorMultiReturn propagator = new
ListRemovePropagator(inputList, constraint1, removePosition, constraint2);
+ PrivacyConstraint[] mergedConstraints = propagator.propagate();
+
+ if ( !singleElementPrivacy ){
+ Map<DataRange, PrivacyLevel> outputPrivacy =
mergedConstraints[0].getFineGrainedPrivacy().getPrivacyLevel(
+ new DataRange(new long[]{0}, new
long[]{dataLength-1})
+ );
+
constraint1.getFineGrainedPrivacy().getAllConstraintsList().forEach(
+ constraint -> Assert.assertTrue("Merged
constraint should contain same privacy levels as input 1",
+
outputPrivacy.containsValue(constraint.getValue()))
+ );
+ }
+
+ Assert.assertEquals(expectedOutput2,
mergedConstraints[1].getPrivacyLevel());
+
Assert.assertFalse(mergedConstraints[1].hasFineGrainedConstraints());
+ }
+
+ private void integrationCBindTest(PrivacyConstraint privacyConstraint1,
PrivacyConstraint privacyConstraint2,
+ PrivacyConstraint expectedOutput){
+ TestConfiguration config =
getAndLoadTestConfiguration(TEST_NAME_CBIND);
+ fullDMLScriptName = SCRIPT_DIR + TEST_DIR +
config.getTestScript() + ".dml";
+
+ int cols1 = 20;
+ int cols2 = 30;
+ int rows = 10;
+ double[][] A = getRandomMatrix(rows, cols1, -10, 10, 0.5, 1);
+ double[][] B = getRandomMatrix(rows, cols2, -10, 10, 0.5, 1);
+ writeInputMatrixWithMTD("A", A, false, new
MatrixCharacteristics(rows, cols1), privacyConstraint1);
+ writeInputMatrixWithMTD("B", B, false, new
MatrixCharacteristics(rows, cols2), privacyConstraint2);
+
+ programArgs = new String[]{"-nvargs", "A=" + input("A"), "B=" +
input("B"), "C=" + output("C")};
+ runTest(true,false,null,-1);
+
+ PrivacyConstraint outputConstraint =
getPrivacyConstraintFromMetaData("C");
+ Assert.assertEquals(expectedOutput, outputConstraint);
+ }
+
+ private void integrationStringAppendTest(PrivacyConstraint
privacyConstraint1, PrivacyConstraint privacyConstraint2,
+ PrivacyConstraint expectedOutput){
+ TestConfiguration config =
getAndLoadTestConfiguration(TEST_NAME_STRING);
+ fullDMLScriptName = SCRIPT_DIR + TEST_DIR +
config.getTestScript() + ".dml";
+
+ int cols = 1;
+ int rows = 1;
+ double[][] A = getRandomMatrix(rows, cols, -10, 10, 0.5, 1);
+ double[][] B = getRandomMatrix(rows, cols, -10, 10, 0.5, 1);
+ writeInputMatrixWithMTD("A", A, false, new
MatrixCharacteristics(rows, cols), privacyConstraint1);
+ writeInputMatrixWithMTD("B", B, false, new
MatrixCharacteristics(rows, cols), privacyConstraint2);
+
+ programArgs = new String[]{"-nvargs", "A=" + input("A"), "B=" +
input("B"), "C=" + output("C")};
+ runTest(true,false,null,-1);
+
+ PrivacyConstraint outputConstraint =
getPrivacyConstraintFromMetaData("C");
+ Assert.assertEquals(expectedOutput, outputConstraint);
+ }
+
private void integrationListAppendTest(PrivacyConstraint
privacyConstraint1, PrivacyConstraint privacyConstraint2,
PrivacyConstraint expectedOutput){
TestConfiguration config =
getAndLoadTestConfiguration(TEST_NAME_LIST);