This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new a677fcd [MINOR] Fix compressed unary aggregates (in-place binarycell
ops)
a677fcd is described below
commit a677fcd5de7de8e46f87eec13833aedbe5e870bd
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat Feb 13 02:15:57 2021 +0100
[MINOR] Fix compressed unary aggregates (in-place binarycell ops)
The compressed unary aggregates used the normal binary element-wise
operations for update-in-place which broke the contract. Instead this is
now replaced with the appropriate binary in-place operations.
Furthermore, the formatting of this file used spaces instead of tabs
which is now also fixed.
---
.../sysds/runtime/compress/lib/LibCompAgg.java | 1000 ++++++++++----------
1 file changed, 499 insertions(+), 501 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java
b/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java
index 437cf97..d3ab8b7 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java
@@ -53,505 +53,503 @@ import org.apache.sysds.runtime.util.CommonThreadPool;
public class LibCompAgg {
- // private static final Log LOG =
LogFactory.getLog(LibCompAgg.class.getName());
-
- // private static final long MIN_PAR_AGG_THRESHOLD = 8 * 1024 * 1024;
- private static final long MIN_PAR_AGG_THRESHOLD = 8;
-
- private static ThreadLocal<MatrixBlock> memPool = new
ThreadLocal<MatrixBlock>() {
- @Override
- protected MatrixBlock initialValue() {
- return null;
- }
- };
-
- public static MatrixBlock aggregateUnary(CompressedMatrixBlock
inputMatrix, MatrixBlock outputMatrix,
- AggregateUnaryOperator op, int blen, MatrixIndexes indexesIn, boolean
inCP) {
- if(inputMatrix.getColGroups() != null) {
- fillStart(outputMatrix, op);
-
- if(inputMatrix.isOverlapping() &&
- (op.aggOp.increOp.fn instanceof KahanPlusSq ||
(op.aggOp.increOp.fn instanceof Builtin &&
- (((Builtin) op.aggOp.increOp.fn).getBuiltinCode() ==
BuiltinCode.MIN ||
- ((Builtin) op.aggOp.increOp.fn).getBuiltinCode() ==
BuiltinCode.MAX))))
- aggregateUnaryOverlapping(inputMatrix, outputMatrix, op,
indexesIn, inCP);
- else
- aggregateUnaryNormalCompressedMatrixBlock(inputMatrix,
outputMatrix, op, blen, indexesIn, inCP);
- }
-
- if(op.aggOp.existsCorrection() && inCP)
- outputMatrix.dropLastRowsOrColumns(op.aggOp.correction);
-
- outputMatrix.recomputeNonZeros();
-
- return outputMatrix;
- }
-
- private static void
aggregateUnaryNormalCompressedMatrixBlock(CompressedMatrixBlock inputMatrix,
- MatrixBlock outputMatrix, AggregateUnaryOperator op, int blen,
MatrixIndexes indexesIn, boolean inCP) {
-
- aggregateUncompressedColGroups(inputMatrix, outputMatrix, op);
-
- if(isValidForParallelProcessing(inputMatrix, op))
- tryToAggregateInParallel(inputMatrix, outputMatrix, op);
- else
- aggregateSingleThreaded(inputMatrix, outputMatrix, op);
-
- postProcessAggregate(inputMatrix, outputMatrix, op);
-
- }
-
- private static boolean isValidForParallelProcessing(CompressedMatrixBlock
m1, AggregateUnaryOperator op) {
- return op.getNumThreads() > 1 && m1.getExactSizeOnDisk() >
MIN_PAR_AGG_THRESHOLD;
- }
-
- private static void aggregateUncompressedColGroups(CompressedMatrixBlock
m1, MatrixBlock ret,
- AggregateUnaryOperator op) {
- ColGroupUncompressed uc = m1.getUncompressedColGroup();
- if(uc != null)
- uc.unaryAggregateOperations(op, ret);
- }
-
- private static void tryToAggregateInParallel(CompressedMatrixBlock m1,
MatrixBlock ret, AggregateUnaryOperator op) {
- int k = op.getNumThreads();
- if(k == 1)
- aggregateUnaryOperations(op, m1.getColGroups(), ret, 0,
m1.getNumRows(), m1.getNumColumns());
- else
- aggregateInParallel(m1, ret, op, k);
-
- }
-
- private static void aggregateInParallel(CompressedMatrixBlock m1,
MatrixBlock ret, AggregateUnaryOperator op,
- int k) {
-
- ExecutorService pool = CommonThreadPool.get(op.getNumThreads());
- ArrayList<UnaryAggregateTask> tasks = new ArrayList<>();
- try {
- // compute all compressed column groups
- if(op.indexFn instanceof ReduceCol) {
- final int blkz = CompressionSettings.BITMAP_BLOCK_SZ;
- // int blklen = Math.max((int) Math.ceil((double)
m1.getNumRows() / (op.getNumThreads() * 2)), blkz);
- // blklen += (blklen % blkz != 0) ? blkz - blklen % blkz : 0;
- int blklen = blkz * 4;
- for(int i = 0; i * blklen < m1.getNumRows(); i++)
- tasks.add(new UnaryAggregateTask(m1.getColGroups(), ret, i
* blklen,
- Math.min((i + 1) * blklen, m1.getNumRows()), op,
m1.getNumColumns()));
-
- }
- else {
- List<List<ColGroup>> grpParts =
createTaskPartitionNotIncludingUncompressable(m1.getColGroups(), k);
- for(List<ColGroup> grp : grpParts)
- tasks.add(new UnaryAggregateTask(grp, ret, 0,
m1.getNumRows(), op, m1.getNumColumns(),
- m1.isOverlapping()));
- }
-
- List<Future<MatrixBlock>> futures = pool.invokeAll(tasks);
- pool.shutdown();
-
- // aggregate partial results
- if(op.indexFn instanceof ReduceAll)
- if(op.aggOp.increOp.fn instanceof Builtin)
- aggregateResults(ret, futures, op);
- else
- sumResults(ret, futures);
- else if(op.indexFn instanceof ReduceRow && m1.isOverlapping()) {
- if(op.aggOp.increOp.fn instanceof Builtin)
- aggregateResultVectors(ret, futures, op);
- else
- sumResultVectors(ret, futures);
- }
- else
- for(Future<MatrixBlock> f : futures)
- f.get();
- }
- catch(InterruptedException | ExecutionException e) {
- throw new DMLRuntimeException(e);
- }
- }
-
- private static void sumResults(MatrixBlock ret, List<Future<MatrixBlock>>
futures)
- throws InterruptedException, ExecutionException {
- double val = ret.quickGetValue(0, 0);
- for(Future<MatrixBlock> rtask : futures) {
- double tmp = rtask.get().quickGetValue(0, 0);
- val = val + tmp;
- }
- ret.quickSetValue(0, 0, val);
-
- }
-
- private static void sumResultVectors(MatrixBlock ret,
List<Future<MatrixBlock>> futures)
- throws InterruptedException, ExecutionException {
-
- double[] retVals = ret.getDenseBlockValues();
- for(Future<MatrixBlock> rtask : futures) {
- double[] taskResult = rtask.get().getDenseBlockValues();
- for(int i = 0; i < retVals.length; i++) {
- retVals[i] += taskResult[i];
- }
- }
- ret.setNonZeros(ret.getNumColumns());
- }
-
- private static void aggregateResults(MatrixBlock ret,
List<Future<MatrixBlock>> futures, AggregateUnaryOperator op)
- throws InterruptedException, ExecutionException {
- double val = ret.quickGetValue(0, 0);
- for(Future<MatrixBlock> rtask : futures) {
- double tmp = rtask.get().quickGetValue(0, 0);
- val = op.aggOp.increOp.fn.execute(val, tmp);
- }
- ret.quickSetValue(0, 0, val);
- }
-
- private static void aggregateResultVectors(MatrixBlock ret,
List<Future<MatrixBlock>> futures,
- AggregateUnaryOperator op) throws InterruptedException,
ExecutionException {
- double[] retVals = ret.getDenseBlockValues();
- for(Future<MatrixBlock> rtask : futures) {
- double[] taskResult = rtask.get().getDenseBlockValues();
- for(int i = 0; i < retVals.length; i++) {
- retVals[i] = op.aggOp.increOp.fn.execute(retVals[i] ,
taskResult[i]);
- }
- }
- ret.setNonZeros(ret.getNumColumns());
- }
-
- private static void aggregateSingleThreaded(CompressedMatrixBlock m1,
MatrixBlock ret, AggregateUnaryOperator op) {
- aggregateUnaryOperations(op, m1.getColGroups(), ret, 0,
m1.getNumRows(), m1.getNumColumns());
- }
-
- private static void divideByNumberOfCellsForMean(CompressedMatrixBlock m1,
MatrixBlock ret, IndexFunction idxFn) {
- if(idxFn instanceof ReduceAll)
- divideByNumberOfCellsForMeanAll(m1, ret);
- else if(idxFn instanceof ReduceCol)
- divideByNumberOfCellsForMeanRows(m1, ret);
- else if(idxFn instanceof ReduceRow)
- divideByNumberOfCellsForMeanCols(m1, ret);
- }
-
- private static void divideByNumberOfCellsForMeanRows(CompressedMatrixBlock
m1, MatrixBlock ret) {
- for(int i = 0; i < m1.getNumRows(); i++) {
- ret.quickSetValue(i, 0, ret.quickGetValue(i, 0) /
m1.getNumColumns());
- }
- }
-
- private static void divideByNumberOfCellsForMeanCols(CompressedMatrixBlock
m1, MatrixBlock ret) {
- for(int i = 0; i < m1.getNumColumns(); i++) {
- ret.quickSetValue(0, i, ret.quickGetValue(0, i) / m1.getNumRows());
- }
- }
-
- private static void divideByNumberOfCellsForMeanAll(CompressedMatrixBlock
m1, MatrixBlock ret) {
- ret.quickSetValue(0, 0, ret.quickGetValue(0, 0) / (m1.getNumColumns()
* m1.getNumRows()));
- }
-
- private static void postProcessAggregate(CompressedMatrixBlock m1,
MatrixBlock ret, AggregateUnaryOperator op) {
- if(op.aggOp.increOp.fn instanceof Mean)
- divideByNumberOfCellsForMean(m1, ret, op.indexFn);
-
- }
-
- private static void aggregateUnaryOverlapping(CompressedMatrixBlock m1,
MatrixBlock ret, AggregateUnaryOperator op,
- MatrixIndexes indexesIn, boolean inCP) {
- try {
- List<Future<MatrixBlock>> rtasks =
generateUnaryAggregateOverlappingFutures(m1, ret, op);
- reduceOverlappingFutures(rtasks, ret, op);
- }
- catch(InterruptedException | ExecutionException e) {
- throw new DMLRuntimeException(e);
- }
-
- }
-
- private static void reduceOverlappingFutures(List<Future<MatrixBlock>>
rtasks, MatrixBlock ret,
- AggregateUnaryOperator op) throws InterruptedException,
ExecutionException {
- if(isReduceAll(ret, op.indexFn))
- reduceAllOverlappingFutures(rtasks, ret, op);
- else if(op.indexFn instanceof ReduceRow)
- reduceColOverlappingFutures(rtasks, ret, op);
- else
- reduceRowOverlappingFutures(rtasks, ret, op);
- }
-
- private static void reduceColOverlappingFutures(List<Future<MatrixBlock>>
rtasks, MatrixBlock ret,
- AggregateUnaryOperator op) throws InterruptedException,
ExecutionException {
- for(Future<MatrixBlock> rtask : rtasks) {
- LibMatrixBincell.bincellOp(rtask.get(),
- ret,
- ret,
- (op.aggOp.increOp.fn instanceof KahanFunction) ? new
BinaryOperator(
- Plus.getPlusFnObject()) : op.aggOp.increOp);
- }
- }
-
- private static void reduceRowOverlappingFutures(List<Future<MatrixBlock>>
rtasks, MatrixBlock ret,
- AggregateUnaryOperator op) throws InterruptedException,
ExecutionException {
- for(Future<MatrixBlock> rtask : rtasks) {
- rtask.get();
- }
- }
-
- private static boolean isReduceAll(MatrixBlock ret, IndexFunction idxFn) {
- return idxFn instanceof ReduceAll || (ret.getNumColumns() == 1 &&
ret.getNumRows() == 1);
- }
-
- private static void reduceAllOverlappingFutures(List<Future<MatrixBlock>>
rtasks, MatrixBlock ret,
- AggregateUnaryOperator op) throws InterruptedException,
ExecutionException {
-
- if(op.aggOp.increOp.fn instanceof KahanFunction) {
- KahanObject kbuff = new KahanObject(ret.quickGetValue(0, 0), 0);
- KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
- for(Future<MatrixBlock> rtask : rtasks) {
- double tmp = rtask.get().quickGetValue(0, 0);
-
- kplus.execute2(kbuff, tmp);
- }
- ret.quickSetValue(0, 0, kbuff._sum);
- }
- else {
- double val = ret.quickGetValue(0, 0);
- for(Future<MatrixBlock> rtask : rtasks) {
- double tmp = rtask.get().quickGetValue(0, 0);
- val = op.aggOp.increOp.fn.execute(val, tmp);
-
- }
- ret.quickSetValue(0, 0, val);
- }
- }
-
- private static List<Future<MatrixBlock>>
generateUnaryAggregateOverlappingFutures(CompressedMatrixBlock m1,
- MatrixBlock ret, AggregateUnaryOperator op) throws
InterruptedException {
-
- ExecutorService pool = CommonThreadPool.get(op.getNumThreads());
- ArrayList<UnaryAggregateOverlappingTask> tasks = new ArrayList<>();
-
- final int blklen = Math.min(m1.getNumRows() / op.getNumThreads(),
CompressionSettings.BITMAP_BLOCK_SZ);
-
- for(int i = 0; i * blklen < m1.getNumRows(); i++)
- tasks.add(new UnaryAggregateOverlappingTask(m1, ret, i * blklen,
- Math.min((i + 1) * blklen, m1.getNumRows()), op));
-
- List<Future<MatrixBlock>> futures = pool.invokeAll(tasks);
- pool.shutdown();
- return futures;
- }
-
- private static List<List<ColGroup>>
createTaskPartitionNotIncludingUncompressable(List<ColGroup> colGroups, int k) {
- int numTasks = Math.min(k, colGroups.size());
- List<List<ColGroup>> grpParts = new ArrayList<>();
- for(int i = 0; i < numTasks; i++) {
- grpParts.add(new ArrayList<>());
- }
- int pos = 0;
- for(ColGroup grp : colGroups) {
- if(!(grp instanceof ColGroupUncompressed)) {
- List<ColGroup> g = grpParts.get(pos);
- g.add(grp);
- pos = (pos + 1) % numTasks;
- }
- }
-
- return grpParts;
- }
-
- private static void aggregateUnaryOperations(AggregateUnaryOperator op,
List<ColGroup> groups, MatrixBlock ret,
- int rl, int ru, int numColumns) {
- if(op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof
Builtin)
- aggregateUnaryBuiltinRowOperation(op, groups, ret, rl, ru,
numColumns);
- else
- aggregateUnaryNormalOperation(op, groups, ret, rl, ru, numColumns);
- }
-
- private static void aggregateUnaryNormalOperation(AggregateUnaryOperator
op, List<ColGroup> groups, MatrixBlock ret,
- int rl, int ru, int numColumns) {
- for(ColGroup grp : groups)
- grp.unaryAggregateOperations(op, ret, rl, ru);
-
- }
-
- private static void
aggregateUnaryBuiltinRowOperation(AggregateUnaryOperator op, List<ColGroup>
groups,
- MatrixBlock ret, int rl, int ru, int numColumns) {
-
- int[] rnnz = null;
- int numberDenseColumns = 0;
- for(ColGroup grp : groups) {
- grp.unaryAggregateOperations(op, ret, rl, ru);
- if(grp.isDense())
- numberDenseColumns += grp.getNumCols();
- else{
- if (rnnz == null)
- rnnz = new int[ru - rl];
- grp.countNonZerosPerRow(rnnz, rl, ru);
- }
- }
- if(rnnz != null)
- for(int row = rl; row < ru; row++)
- if(rnnz[row-rl] + numberDenseColumns < numColumns)
- ret.quickSetValue(row, 0,
op.aggOp.increOp.fn.execute(ret.quickGetValue(row, 0), 0.0));
-
- }
-
- private static void fillStart(MatrixBlock ret, AggregateUnaryOperator op) {
- if(op.aggOp.increOp.fn instanceof Builtin) {
- Double val = null;
- switch(((Builtin) op.aggOp.increOp.fn).getBuiltinCode()) {
- case MAX:
- val = Double.NEGATIVE_INFINITY;
- break;
- case MIN:
- val = Double.POSITIVE_INFINITY;
- break;
- default:
- break;
- }
- if(val != null) {
- ret.getDenseBlock().set(val);
- }
- }
- }
-
- private static class UnaryAggregateTask implements Callable<MatrixBlock> {
- private final List<ColGroup> _groups;
- private final int _rl;
- private final int _ru;
- private final MatrixBlock _ret;
- private final int _numColumns;
- private final AggregateUnaryOperator _op;
-
- protected UnaryAggregateTask(List<ColGroup> groups, MatrixBlock ret,
int rl, int ru, AggregateUnaryOperator op,
- int numColumns) {
- _groups = groups;
- _op = op;
- _rl = rl;
- _ru = ru;
- _numColumns = numColumns;
-
- if(_op.indexFn instanceof ReduceAll) { // sum
- _ret = new MatrixBlock(1, 1, false);
- _ret.allocateDenseBlock();
- if(_op.aggOp.increOp.fn instanceof Builtin)
- System.arraycopy(ret.getDenseBlockValues(),
- 0,
- _ret.getDenseBlockValues(),
- 0,
- ret.getNumRows() * ret.getNumColumns());
- }
- else // colSums / rowSums
- _ret = ret;
-
- }
-
- protected UnaryAggregateTask(List<ColGroup> groups, MatrixBlock ret,
int rl, int ru, AggregateUnaryOperator op,
- int numColumns, boolean overlapping) {
- _groups = groups;
- _op = op;
- _rl = rl;
- _ru = ru;
- _numColumns = numColumns;
-
- if(_op.indexFn instanceof ReduceAll || (_op.indexFn instanceof
ReduceRow && overlapping)) {
- _ret = new MatrixBlock(ret.getNumRows(), ret.getNumColumns(),
false);
- _ret.allocateDenseBlock();
- if(_op.aggOp.increOp.fn instanceof Builtin)
- System.arraycopy(ret.getDenseBlockValues(),
- 0,
- _ret.getDenseBlockValues(),
- 0,
- ret.getNumRows() * ret.getNumColumns());
- }
- else // colSums / rowSums
- _ret = ret;
- }
-
- @Override
- public MatrixBlock call() {
- aggregateUnaryOperations(_op, _groups, _ret, _rl, _ru,
_numColumns);
- return _ret;
- }
- }
-
- private static class UnaryAggregateOverlappingTask implements
Callable<MatrixBlock> {
- private final CompressedMatrixBlock _m1;
- private final int _rl;
- private final int _ru;
- private final MatrixBlock _ret;
- private final AggregateUnaryOperator _op;
-
- protected UnaryAggregateOverlappingTask(CompressedMatrixBlock m1,
MatrixBlock ret, int rl, int ru,
- AggregateUnaryOperator op) {
- _m1 = m1;
- _op = op;
- _rl = rl;
- _ru = ru;
- _ret = ret;
-
- }
-
- private MatrixBlock setupOutputMatrix() {
- MatrixBlock outputBlock;
- if(_op.indexFn instanceof ReduceAll)
- outputBlock = new MatrixBlock(_ret.getNumRows(),
_ret.getNumColumns(), false).allocateDenseBlock();
- else if(_op.indexFn instanceof ReduceCol)
- outputBlock = new MatrixBlock(_ru - _rl, _ret.getNumColumns(),
false).allocateDenseBlock();
- else
- outputBlock = new MatrixBlock(_ret.getNumRows(),
_ret.getNumColumns(), false).allocateDenseBlock();
-
- if(_op.aggOp.increOp.fn instanceof Builtin)
- if(_op.indexFn instanceof ReduceCol)
- System.arraycopy(_ret.getDenseBlockValues(),
- _rl * _ret.getNumColumns(),
- outputBlock.getDenseBlockValues(),
- 0,
- outputBlock.getDenseBlockValues().length);
- else
- System.arraycopy(_ret.getDenseBlockValues(),
- 0,
- outputBlock.getDenseBlockValues(),
- 0,
- _ret.getDenseBlockValues().length);
-
- return outputBlock;
- }
-
- private MatrixBlock getTmp() {
- MatrixBlock tmp = memPool.get();
- if(tmp == null) {
- memPool.set(new MatrixBlock(_ru - _rl, _m1.getNumColumns(),
false, -1).allocateBlock());
- tmp = memPool.get();
- }
- else {
- tmp = memPool.get();
- tmp.reset(_ru - _rl, _m1.getNumColumns(), false, -1);
- }
- return tmp;
- }
-
- private MatrixBlock decompressToTemp() {
- MatrixBlock tmp = getTmp();
- for(ColGroup g : _m1.getColGroups())
- g.decompressToBlockSafe(tmp, _rl, _ru, 0, g.getValues(),
false);
- tmp.setNonZeros(_rl + _ru);
- return tmp;
- }
-
- @Override
- public MatrixBlock call() {
- MatrixBlock tmp = decompressToTemp();
-
- MatrixBlock outputBlock = setupOutputMatrix();
- LibMatrixAgg.aggregateUnaryMatrix(tmp, outputBlock, _op);
-
- if(_op.indexFn instanceof ReduceCol) {
- double[] retValues = _ret.getDenseBlockValues();
- int currentIndex = _rl * _ret.getNumColumns();
- double[] outputBlockValues = outputBlock.getDenseBlockValues();
- System.arraycopy(outputBlockValues, 0, retValues,
currentIndex, outputBlockValues.length);
-
- return null;
- }
- else {
-
- return outputBlock;
- }
- }
- }
+ // private static final Log LOG =
LogFactory.getLog(LibCompAgg.class.getName());
+
+ // private static final long MIN_PAR_AGG_THRESHOLD = 8 * 1024 * 1024;
+ private static final long MIN_PAR_AGG_THRESHOLD = 8;
+
+ private static ThreadLocal<MatrixBlock> memPool = new
ThreadLocal<MatrixBlock>() {
+ @Override
+ protected MatrixBlock initialValue() {
+ return null;
+ }
+ };
+
+ public static MatrixBlock aggregateUnary(CompressedMatrixBlock
inputMatrix, MatrixBlock outputMatrix,
+ AggregateUnaryOperator op, int blen, MatrixIndexes indexesIn,
boolean inCP) {
+ if(inputMatrix.getColGroups() != null) {
+ fillStart(outputMatrix, op);
+
+ if(inputMatrix.isOverlapping() &&
+ (op.aggOp.increOp.fn instanceof KahanPlusSq ||
(op.aggOp.increOp.fn instanceof Builtin &&
+ (((Builtin)
op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
+ ((Builtin)
op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX))))
+ aggregateUnaryOverlapping(inputMatrix,
outputMatrix, op, indexesIn, inCP);
+ else
+
aggregateUnaryNormalCompressedMatrixBlock(inputMatrix, outputMatrix, op, blen,
indexesIn, inCP);
+ }
+
+ if(op.aggOp.existsCorrection() && inCP)
+ outputMatrix.dropLastRowsOrColumns(op.aggOp.correction);
+
+ outputMatrix.recomputeNonZeros();
+
+ return outputMatrix;
+ }
+
+ private static void
aggregateUnaryNormalCompressedMatrixBlock(CompressedMatrixBlock inputMatrix,
+ MatrixBlock outputMatrix, AggregateUnaryOperator op, int blen,
MatrixIndexes indexesIn, boolean inCP) {
+
+ aggregateUncompressedColGroups(inputMatrix, outputMatrix, op);
+
+ if(isValidForParallelProcessing(inputMatrix, op))
+ tryToAggregateInParallel(inputMatrix, outputMatrix, op);
+ else
+ aggregateSingleThreaded(inputMatrix, outputMatrix, op);
+
+ postProcessAggregate(inputMatrix, outputMatrix, op);
+
+ }
+
+ private static boolean
isValidForParallelProcessing(CompressedMatrixBlock m1, AggregateUnaryOperator
op) {
+ return op.getNumThreads() > 1 && m1.getExactSizeOnDisk() >
MIN_PAR_AGG_THRESHOLD;
+ }
+
+ private static void
aggregateUncompressedColGroups(CompressedMatrixBlock m1, MatrixBlock ret,
+ AggregateUnaryOperator op) {
+ ColGroupUncompressed uc = m1.getUncompressedColGroup();
+ if(uc != null)
+ uc.unaryAggregateOperations(op, ret);
+ }
+
+ private static void tryToAggregateInParallel(CompressedMatrixBlock m1,
MatrixBlock ret, AggregateUnaryOperator op) {
+ int k = op.getNumThreads();
+ if(k == 1)
+ aggregateUnaryOperations(op, m1.getColGroups(), ret, 0,
m1.getNumRows(), m1.getNumColumns());
+ else
+ aggregateInParallel(m1, ret, op, k);
+
+ }
+
+ private static void aggregateInParallel(CompressedMatrixBlock m1,
MatrixBlock ret, AggregateUnaryOperator op,
+ int k) {
+
+ ExecutorService pool = CommonThreadPool.get(op.getNumThreads());
+ ArrayList<UnaryAggregateTask> tasks = new ArrayList<>();
+ try {
+ // compute all compressed column groups
+ if(op.indexFn instanceof ReduceCol) {
+ final int blkz =
CompressionSettings.BITMAP_BLOCK_SZ;
+ // int blklen = Math.max((int)
Math.ceil((double) m1.getNumRows() / (op.getNumThreads() * 2)), blkz);
+ // blklen += (blklen % blkz != 0) ? blkz -
blklen % blkz : 0;
+ int blklen = blkz * 4;
+ for(int i = 0; i * blklen < m1.getNumRows();
i++)
+ tasks.add(new
UnaryAggregateTask(m1.getColGroups(), ret, i * blklen,
+ Math.min((i + 1) * blklen,
m1.getNumRows()), op, m1.getNumColumns()));
+
+ }
+ else {
+ List<List<ColGroup>> grpParts =
createTaskPartitionNotIncludingUncompressable(m1.getColGroups(), k);
+ for(List<ColGroup> grp : grpParts)
+ tasks.add(new UnaryAggregateTask(grp,
ret, 0, m1.getNumRows(), op, m1.getNumColumns(),
+ m1.isOverlapping()));
+ }
+
+ List<Future<MatrixBlock>> futures =
pool.invokeAll(tasks);
+ pool.shutdown();
+
+ // aggregate partial results
+ if(op.indexFn instanceof ReduceAll)
+ if(op.aggOp.increOp.fn instanceof Builtin)
+ aggregateResults(ret, futures, op);
+ else
+ sumResults(ret, futures);
+ else if(op.indexFn instanceof ReduceRow &&
m1.isOverlapping()) {
+ if(op.aggOp.increOp.fn instanceof Builtin)
+ aggregateResultVectors(ret, futures,
op);
+ else
+ sumResultVectors(ret, futures);
+ }
+ else
+ for(Future<MatrixBlock> f : futures)
+ f.get();
+ }
+ catch(InterruptedException | ExecutionException e) {
+ throw new DMLRuntimeException(e);
+ }
+ }
+
+ private static void sumResults(MatrixBlock ret,
List<Future<MatrixBlock>> futures)
+ throws InterruptedException, ExecutionException {
+ double val = ret.quickGetValue(0, 0);
+ for(Future<MatrixBlock> rtask : futures) {
+ double tmp = rtask.get().quickGetValue(0, 0);
+ val = val + tmp;
+ }
+ ret.quickSetValue(0, 0, val);
+
+ }
+
+ private static void sumResultVectors(MatrixBlock ret,
List<Future<MatrixBlock>> futures)
+ throws InterruptedException, ExecutionException {
+
+ double[] retVals = ret.getDenseBlockValues();
+ for(Future<MatrixBlock> rtask : futures) {
+ double[] taskResult = rtask.get().getDenseBlockValues();
+ for(int i = 0; i < retVals.length; i++) {
+ retVals[i] += taskResult[i];
+ }
+ }
+ ret.setNonZeros(ret.getNumColumns());
+ }
+
+ private static void aggregateResults(MatrixBlock ret,
List<Future<MatrixBlock>> futures, AggregateUnaryOperator op)
+ throws InterruptedException, ExecutionException {
+ double val = ret.quickGetValue(0, 0);
+ for(Future<MatrixBlock> rtask : futures) {
+ double tmp = rtask.get().quickGetValue(0, 0);
+ val = op.aggOp.increOp.fn.execute(val, tmp);
+ }
+ ret.quickSetValue(0, 0, val);
+ }
+
+ private static void aggregateResultVectors(MatrixBlock ret,
List<Future<MatrixBlock>> futures,
+ AggregateUnaryOperator op) throws InterruptedException,
ExecutionException {
+ double[] retVals = ret.getDenseBlockValues();
+ for(Future<MatrixBlock> rtask : futures) {
+ double[] taskResult = rtask.get().getDenseBlockValues();
+ for(int i = 0; i < retVals.length; i++) {
+ retVals[i] =
op.aggOp.increOp.fn.execute(retVals[i] , taskResult[i]);
+ }
+ }
+ ret.setNonZeros(ret.getNumColumns());
+ }
+
+ private static void aggregateSingleThreaded(CompressedMatrixBlock m1,
MatrixBlock ret, AggregateUnaryOperator op) {
+ aggregateUnaryOperations(op, m1.getColGroups(), ret, 0,
m1.getNumRows(), m1.getNumColumns());
+ }
+
+ private static void divideByNumberOfCellsForMean(CompressedMatrixBlock
m1, MatrixBlock ret, IndexFunction idxFn) {
+ if(idxFn instanceof ReduceAll)
+ divideByNumberOfCellsForMeanAll(m1, ret);
+ else if(idxFn instanceof ReduceCol)
+ divideByNumberOfCellsForMeanRows(m1, ret);
+ else if(idxFn instanceof ReduceRow)
+ divideByNumberOfCellsForMeanCols(m1, ret);
+ }
+
+ private static void
divideByNumberOfCellsForMeanRows(CompressedMatrixBlock m1, MatrixBlock ret) {
+ for(int i = 0; i < m1.getNumRows(); i++) {
+ ret.quickSetValue(i, 0, ret.quickGetValue(i, 0) /
m1.getNumColumns());
+ }
+ }
+
+ private static void
divideByNumberOfCellsForMeanCols(CompressedMatrixBlock m1, MatrixBlock ret) {
+ for(int i = 0; i < m1.getNumColumns(); i++) {
+ ret.quickSetValue(0, i, ret.quickGetValue(0, i) /
m1.getNumRows());
+ }
+ }
+
+ private static void
divideByNumberOfCellsForMeanAll(CompressedMatrixBlock m1, MatrixBlock ret) {
+ ret.quickSetValue(0, 0, ret.quickGetValue(0, 0) /
(m1.getNumColumns() * m1.getNumRows()));
+ }
+
+ private static void postProcessAggregate(CompressedMatrixBlock m1,
MatrixBlock ret, AggregateUnaryOperator op) {
+ if(op.aggOp.increOp.fn instanceof Mean)
+ divideByNumberOfCellsForMean(m1, ret, op.indexFn);
+
+ }
+
+ private static void aggregateUnaryOverlapping(CompressedMatrixBlock m1,
MatrixBlock ret, AggregateUnaryOperator op,
+ MatrixIndexes indexesIn, boolean inCP) {
+ try {
+ List<Future<MatrixBlock>> rtasks =
generateUnaryAggregateOverlappingFutures(m1, ret, op);
+ reduceOverlappingFutures(rtasks, ret, op);
+ }
+ catch(InterruptedException | ExecutionException e) {
+ throw new DMLRuntimeException(e);
+ }
+
+ }
+
+ private static void reduceOverlappingFutures(List<Future<MatrixBlock>>
rtasks, MatrixBlock ret,
+ AggregateUnaryOperator op) throws InterruptedException,
ExecutionException {
+ if(isReduceAll(ret, op.indexFn))
+ reduceAllOverlappingFutures(rtasks, ret, op);
+ else if(op.indexFn instanceof ReduceRow)
+ reduceColOverlappingFutures(rtasks, ret, op);
+ else
+ reduceRowOverlappingFutures(rtasks, ret, op);
+ }
+
+ private static void
reduceColOverlappingFutures(List<Future<MatrixBlock>> rtasks, MatrixBlock ret,
+ AggregateUnaryOperator op) throws InterruptedException,
ExecutionException {
+ for(Future<MatrixBlock> rtask : rtasks) {
+ LibMatrixBincell.bincellOpInPlace(ret, rtask.get(),
+ (op.aggOp.increOp.fn instanceof KahanFunction)
? new BinaryOperator(
+ Plus.getPlusFnObject()) :
op.aggOp.increOp);
+ }
+ }
+
+ private static void
reduceRowOverlappingFutures(List<Future<MatrixBlock>> rtasks, MatrixBlock ret,
+ AggregateUnaryOperator op) throws InterruptedException,
ExecutionException {
+ for(Future<MatrixBlock> rtask : rtasks) {
+ rtask.get();
+ }
+ }
+
+ private static boolean isReduceAll(MatrixBlock ret, IndexFunction
idxFn) {
+ return idxFn instanceof ReduceAll || (ret.getNumColumns() == 1
&& ret.getNumRows() == 1);
+ }
+
+ private static void
reduceAllOverlappingFutures(List<Future<MatrixBlock>> rtasks, MatrixBlock ret,
+ AggregateUnaryOperator op) throws InterruptedException,
ExecutionException {
+
+ if(op.aggOp.increOp.fn instanceof KahanFunction) {
+ KahanObject kbuff = new
KahanObject(ret.quickGetValue(0, 0), 0);
+ KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+ for(Future<MatrixBlock> rtask : rtasks) {
+ double tmp = rtask.get().quickGetValue(0, 0);
+
+ kplus.execute2(kbuff, tmp);
+ }
+ ret.quickSetValue(0, 0, kbuff._sum);
+ }
+ else {
+ double val = ret.quickGetValue(0, 0);
+ for(Future<MatrixBlock> rtask : rtasks) {
+ double tmp = rtask.get().quickGetValue(0, 0);
+ val = op.aggOp.increOp.fn.execute(val, tmp);
+
+ }
+ ret.quickSetValue(0, 0, val);
+ }
+ }
+
+ private static List<Future<MatrixBlock>>
generateUnaryAggregateOverlappingFutures(CompressedMatrixBlock m1,
+ MatrixBlock ret, AggregateUnaryOperator op) throws
InterruptedException {
+
+ ExecutorService pool = CommonThreadPool.get(op.getNumThreads());
+ ArrayList<UnaryAggregateOverlappingTask> tasks = new
ArrayList<>();
+
+ final int blklen = Math.min(m1.getNumRows() /
op.getNumThreads(), CompressionSettings.BITMAP_BLOCK_SZ);
+
+ for(int i = 0; i * blklen < m1.getNumRows(); i++)
+ tasks.add(new UnaryAggregateOverlappingTask(m1, ret, i
* blklen,
+ Math.min((i + 1) * blklen, m1.getNumRows()),
op));
+
+ List<Future<MatrixBlock>> futures = pool.invokeAll(tasks);
+ pool.shutdown();
+ return futures;
+ }
+
+ private static List<List<ColGroup>>
createTaskPartitionNotIncludingUncompressable(List<ColGroup> colGroups, int k) {
+ int numTasks = Math.min(k, colGroups.size());
+ List<List<ColGroup>> grpParts = new ArrayList<>();
+ for(int i = 0; i < numTasks; i++) {
+ grpParts.add(new ArrayList<>());
+ }
+ int pos = 0;
+ for(ColGroup grp : colGroups) {
+ if(!(grp instanceof ColGroupUncompressed)) {
+ List<ColGroup> g = grpParts.get(pos);
+ g.add(grp);
+ pos = (pos + 1) % numTasks;
+ }
+ }
+
+ return grpParts;
+ }
+
+ private static void aggregateUnaryOperations(AggregateUnaryOperator op,
List<ColGroup> groups, MatrixBlock ret,
+ int rl, int ru, int numColumns) {
+ if(op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn
instanceof Builtin)
+ aggregateUnaryBuiltinRowOperation(op, groups, ret, rl,
ru, numColumns);
+ else
+ aggregateUnaryNormalOperation(op, groups, ret, rl, ru,
numColumns);
+ }
+
+ private static void
aggregateUnaryNormalOperation(AggregateUnaryOperator op, List<ColGroup> groups,
MatrixBlock ret,
+ int rl, int ru, int numColumns) {
+ for(ColGroup grp : groups)
+ grp.unaryAggregateOperations(op, ret, rl, ru);
+
+ }
+
+ private static void
aggregateUnaryBuiltinRowOperation(AggregateUnaryOperator op, List<ColGroup>
groups,
+ MatrixBlock ret, int rl, int ru, int numColumns) {
+
+ int[] rnnz = null;
+ int numberDenseColumns = 0;
+ for(ColGroup grp : groups) {
+ grp.unaryAggregateOperations(op, ret, rl, ru);
+ if(grp.isDense())
+ numberDenseColumns += grp.getNumCols();
+ else{
+ if (rnnz == null)
+ rnnz = new int[ru - rl];
+ grp.countNonZerosPerRow(rnnz, rl, ru);
+ }
+ }
+ if(rnnz != null)
+ for(int row = rl; row < ru; row++)
+ if(rnnz[row-rl] + numberDenseColumns <
numColumns)
+ ret.quickSetValue(row, 0,
op.aggOp.increOp.fn.execute(ret.quickGetValue(row, 0), 0.0));
+
+ }
+
+ private static void fillStart(MatrixBlock ret, AggregateUnaryOperator
op) {
+ if(op.aggOp.increOp.fn instanceof Builtin) {
+ Double val = null;
+ switch(((Builtin)
op.aggOp.increOp.fn).getBuiltinCode()) {
+ case MAX:
+ val = Double.NEGATIVE_INFINITY;
+ break;
+ case MIN:
+ val = Double.POSITIVE_INFINITY;
+ break;
+ default:
+ break;
+ }
+ if(val != null) {
+ ret.getDenseBlock().set(val);
+ }
+ }
+ }
+
+ private static class UnaryAggregateTask implements
Callable<MatrixBlock> {
+ private final List<ColGroup> _groups;
+ private final int _rl;
+ private final int _ru;
+ private final MatrixBlock _ret;
+ private final int _numColumns;
+ private final AggregateUnaryOperator _op;
+
+ protected UnaryAggregateTask(List<ColGroup> groups, MatrixBlock
ret, int rl, int ru, AggregateUnaryOperator op,
+ int numColumns) {
+ _groups = groups;
+ _op = op;
+ _rl = rl;
+ _ru = ru;
+ _numColumns = numColumns;
+
+ if(_op.indexFn instanceof ReduceAll) { // sum
+ _ret = new MatrixBlock(1, 1, false);
+ _ret.allocateDenseBlock();
+ if(_op.aggOp.increOp.fn instanceof Builtin)
+
System.arraycopy(ret.getDenseBlockValues(),
+ 0,
+ _ret.getDenseBlockValues(),
+ 0,
+ ret.getNumRows() *
ret.getNumColumns());
+ }
+ else // colSums / rowSums
+ _ret = ret;
+
+ }
+
+ protected UnaryAggregateTask(List<ColGroup> groups, MatrixBlock
ret, int rl, int ru, AggregateUnaryOperator op,
+ int numColumns, boolean overlapping) {
+ _groups = groups;
+ _op = op;
+ _rl = rl;
+ _ru = ru;
+ _numColumns = numColumns;
+
+ if(_op.indexFn instanceof ReduceAll || (_op.indexFn
instanceof ReduceRow && overlapping)) {
+ _ret = new MatrixBlock(ret.getNumRows(),
ret.getNumColumns(), false);
+ _ret.allocateDenseBlock();
+ if(_op.aggOp.increOp.fn instanceof Builtin)
+
System.arraycopy(ret.getDenseBlockValues(),
+ 0,
+ _ret.getDenseBlockValues(),
+ 0,
+ ret.getNumRows() *
ret.getNumColumns());
+ }
+ else // colSums / rowSums
+ _ret = ret;
+ }
+
+ @Override
+ public MatrixBlock call() {
+ aggregateUnaryOperations(_op, _groups, _ret, _rl, _ru,
_numColumns);
+ return _ret;
+ }
+ }
+
+ private static class UnaryAggregateOverlappingTask implements
Callable<MatrixBlock> {
+ private final CompressedMatrixBlock _m1;
+ private final int _rl;
+ private final int _ru;
+ private final MatrixBlock _ret;
+ private final AggregateUnaryOperator _op;
+
+ protected UnaryAggregateOverlappingTask(CompressedMatrixBlock
m1, MatrixBlock ret, int rl, int ru,
+ AggregateUnaryOperator op) {
+ _m1 = m1;
+ _op = op;
+ _rl = rl;
+ _ru = ru;
+ _ret = ret;
+
+ }
+
+ private MatrixBlock setupOutputMatrix() {
+ MatrixBlock outputBlock;
+ if(_op.indexFn instanceof ReduceAll)
+ outputBlock = new
MatrixBlock(_ret.getNumRows(), _ret.getNumColumns(),
false).allocateDenseBlock();
+ else if(_op.indexFn instanceof ReduceCol)
+ outputBlock = new MatrixBlock(_ru - _rl,
_ret.getNumColumns(), false).allocateDenseBlock();
+ else
+ outputBlock = new
MatrixBlock(_ret.getNumRows(), _ret.getNumColumns(),
false).allocateDenseBlock();
+
+ if(_op.aggOp.increOp.fn instanceof Builtin)
+ if(_op.indexFn instanceof ReduceCol)
+
System.arraycopy(_ret.getDenseBlockValues(),
+ _rl * _ret.getNumColumns(),
+
outputBlock.getDenseBlockValues(),
+ 0,
+
outputBlock.getDenseBlockValues().length);
+ else
+
System.arraycopy(_ret.getDenseBlockValues(),
+ 0,
+
outputBlock.getDenseBlockValues(),
+ 0,
+
_ret.getDenseBlockValues().length);
+
+ return outputBlock;
+ }
+
+ private MatrixBlock getTmp() {
+ MatrixBlock tmp = memPool.get();
+ if(tmp == null) {
+ memPool.set(new MatrixBlock(_ru - _rl,
_m1.getNumColumns(), false, -1).allocateBlock());
+ tmp = memPool.get();
+ }
+ else {
+ tmp = memPool.get();
+ tmp.reset(_ru - _rl, _m1.getNumColumns(),
false, -1);
+ }
+ return tmp;
+ }
+
+ private MatrixBlock decompressToTemp() {
+ MatrixBlock tmp = getTmp();
+ for(ColGroup g : _m1.getColGroups())
+ g.decompressToBlockSafe(tmp, _rl, _ru, 0,
g.getValues(), false);
+ tmp.setNonZeros(_rl + _ru);
+ return tmp;
+ }
+
+ @Override
+ public MatrixBlock call() {
+ MatrixBlock tmp = decompressToTemp();
+
+ MatrixBlock outputBlock = setupOutputMatrix();
+ LibMatrixAgg.aggregateUnaryMatrix(tmp, outputBlock,
_op);
+
+ if(_op.indexFn instanceof ReduceCol) {
+ double[] retValues = _ret.getDenseBlockValues();
+ int currentIndex = _rl * _ret.getNumColumns();
+ double[] outputBlockValues =
outputBlock.getDenseBlockValues();
+ System.arraycopy(outputBlockValues, 0,
retValues, currentIndex, outputBlockValues.length);
+
+ return null;
+ }
+ else {
+
+ return outputBlock;
+ }
+ }
+ }
}