This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new a677fcd  [MINOR] Fix compressed unary aggregates (in-place binarycell 
ops)
a677fcd is described below

commit a677fcd5de7de8e46f87eec13833aedbe5e870bd
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat Feb 13 02:15:57 2021 +0100

    [MINOR] Fix compressed unary aggregates (in-place binarycell ops)
    
    The compressed unary aggregates used the normal binary element-wise
    operations for update-in-place which broke the contract. Instead this is
    now replaced with the appropriate binary in-place operations.
    
    Furthermore, the formatting of this file used spaces instead of tabs
    which is now also fixed.
---
 .../sysds/runtime/compress/lib/LibCompAgg.java     | 1000 ++++++++++----------
 1 file changed, 499 insertions(+), 501 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java
index 437cf97..d3ab8b7 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java
@@ -53,505 +53,503 @@ import org.apache.sysds.runtime.util.CommonThreadPool;
 
 public class LibCompAgg {
 
-    // private static final Log LOG = 
LogFactory.getLog(LibCompAgg.class.getName());
-
-    // private static final long MIN_PAR_AGG_THRESHOLD = 8 * 1024 * 1024;
-    private static final long MIN_PAR_AGG_THRESHOLD = 8;
-
-    private static ThreadLocal<MatrixBlock> memPool = new 
ThreadLocal<MatrixBlock>() {
-        @Override
-        protected MatrixBlock initialValue() {
-            return null;
-        }
-    };
-
-    public static MatrixBlock aggregateUnary(CompressedMatrixBlock 
inputMatrix, MatrixBlock outputMatrix,
-        AggregateUnaryOperator op, int blen, MatrixIndexes indexesIn, boolean 
inCP) {
-        if(inputMatrix.getColGroups() != null) {
-            fillStart(outputMatrix, op);
-
-            if(inputMatrix.isOverlapping() &&
-                (op.aggOp.increOp.fn instanceof KahanPlusSq || 
(op.aggOp.increOp.fn instanceof Builtin &&
-                    (((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == 
BuiltinCode.MIN ||
-                        ((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == 
BuiltinCode.MAX))))
-                aggregateUnaryOverlapping(inputMatrix, outputMatrix, op, 
indexesIn, inCP);
-            else
-                aggregateUnaryNormalCompressedMatrixBlock(inputMatrix, 
outputMatrix, op, blen, indexesIn, inCP);
-        }
-
-        if(op.aggOp.existsCorrection() && inCP)
-            outputMatrix.dropLastRowsOrColumns(op.aggOp.correction);
-
-        outputMatrix.recomputeNonZeros();
-
-        return outputMatrix;
-    }
-
-    private static void 
aggregateUnaryNormalCompressedMatrixBlock(CompressedMatrixBlock inputMatrix,
-        MatrixBlock outputMatrix, AggregateUnaryOperator op, int blen, 
MatrixIndexes indexesIn, boolean inCP) {
-
-        aggregateUncompressedColGroups(inputMatrix, outputMatrix, op);
-
-        if(isValidForParallelProcessing(inputMatrix, op))
-            tryToAggregateInParallel(inputMatrix, outputMatrix, op);
-        else
-            aggregateSingleThreaded(inputMatrix, outputMatrix, op);
-
-        postProcessAggregate(inputMatrix, outputMatrix, op);
-
-    }
-
-    private static boolean isValidForParallelProcessing(CompressedMatrixBlock 
m1, AggregateUnaryOperator op) {
-        return op.getNumThreads() > 1 && m1.getExactSizeOnDisk() > 
MIN_PAR_AGG_THRESHOLD;
-    }
-
-    private static void aggregateUncompressedColGroups(CompressedMatrixBlock 
m1, MatrixBlock ret,
-        AggregateUnaryOperator op) {
-        ColGroupUncompressed uc = m1.getUncompressedColGroup();
-        if(uc != null)
-            uc.unaryAggregateOperations(op, ret);
-    }
-
-    private static void tryToAggregateInParallel(CompressedMatrixBlock m1, 
MatrixBlock ret, AggregateUnaryOperator op) {
-        int k = op.getNumThreads();
-        if(k == 1)
-            aggregateUnaryOperations(op, m1.getColGroups(), ret, 0, 
m1.getNumRows(), m1.getNumColumns());
-        else
-            aggregateInParallel(m1, ret, op, k);
-
-    }
-
-    private static void aggregateInParallel(CompressedMatrixBlock m1, 
MatrixBlock ret, AggregateUnaryOperator op,
-        int k) {
-
-        ExecutorService pool = CommonThreadPool.get(op.getNumThreads());
-        ArrayList<UnaryAggregateTask> tasks = new ArrayList<>();
-        try {
-            // compute all compressed column groups
-            if(op.indexFn instanceof ReduceCol) {
-                final int blkz = CompressionSettings.BITMAP_BLOCK_SZ;
-                // int blklen = Math.max((int) Math.ceil((double) 
m1.getNumRows() / (op.getNumThreads() * 2)), blkz);
-                // blklen += (blklen % blkz != 0) ? blkz - blklen % blkz : 0;
-                int blklen = blkz * 4;
-                for(int i = 0; i * blklen < m1.getNumRows(); i++)
-                    tasks.add(new UnaryAggregateTask(m1.getColGroups(), ret, i 
* blklen,
-                        Math.min((i + 1) * blklen, m1.getNumRows()), op, 
m1.getNumColumns()));
-
-            }
-            else {
-                List<List<ColGroup>> grpParts = 
createTaskPartitionNotIncludingUncompressable(m1.getColGroups(), k);
-                for(List<ColGroup> grp : grpParts)
-                    tasks.add(new UnaryAggregateTask(grp, ret, 0, 
m1.getNumRows(), op, m1.getNumColumns(),
-                        m1.isOverlapping()));
-            }
-
-            List<Future<MatrixBlock>> futures = pool.invokeAll(tasks);
-            pool.shutdown();
-
-            // aggregate partial results
-            if(op.indexFn instanceof ReduceAll)
-                if(op.aggOp.increOp.fn instanceof Builtin)
-                    aggregateResults(ret, futures, op);
-                else
-                    sumResults(ret, futures);
-            else if(op.indexFn instanceof ReduceRow && m1.isOverlapping()) {
-                if(op.aggOp.increOp.fn instanceof Builtin)
-                    aggregateResultVectors(ret, futures, op);
-                else
-                    sumResultVectors(ret, futures);
-            }
-            else
-                for(Future<MatrixBlock> f : futures)
-                    f.get();
-        }
-        catch(InterruptedException | ExecutionException e) {
-            throw new DMLRuntimeException(e);
-        }
-    }
-
-    private static void sumResults(MatrixBlock ret, List<Future<MatrixBlock>> 
futures)
-        throws InterruptedException, ExecutionException {
-        double val = ret.quickGetValue(0, 0);
-        for(Future<MatrixBlock> rtask : futures) {
-            double tmp = rtask.get().quickGetValue(0, 0);
-            val = val + tmp;
-        }
-        ret.quickSetValue(0, 0, val);
-
-    }
-
-    private static void sumResultVectors(MatrixBlock ret, 
List<Future<MatrixBlock>> futures)
-        throws InterruptedException, ExecutionException {
-
-        double[] retVals = ret.getDenseBlockValues();
-        for(Future<MatrixBlock> rtask : futures) {
-            double[] taskResult = rtask.get().getDenseBlockValues();
-            for(int i = 0; i < retVals.length; i++) {
-                retVals[i] += taskResult[i];
-            }
-        }
-        ret.setNonZeros(ret.getNumColumns());
-    }
-
-    private static void aggregateResults(MatrixBlock ret, 
List<Future<MatrixBlock>> futures, AggregateUnaryOperator op)
-        throws InterruptedException, ExecutionException {
-        double val = ret.quickGetValue(0, 0);
-        for(Future<MatrixBlock> rtask : futures) {
-            double tmp = rtask.get().quickGetValue(0, 0);
-            val = op.aggOp.increOp.fn.execute(val, tmp);
-        }
-        ret.quickSetValue(0, 0, val);
-    }
-
-    private static void aggregateResultVectors(MatrixBlock ret, 
List<Future<MatrixBlock>> futures,
-        AggregateUnaryOperator op) throws InterruptedException, 
ExecutionException {
-        double[] retVals = ret.getDenseBlockValues();
-        for(Future<MatrixBlock> rtask : futures) {
-            double[] taskResult = rtask.get().getDenseBlockValues();
-            for(int i = 0; i < retVals.length; i++) {
-                retVals[i] = op.aggOp.increOp.fn.execute(retVals[i] ,  
taskResult[i]);
-            }
-        }
-        ret.setNonZeros(ret.getNumColumns());
-    }
-
-    private static void aggregateSingleThreaded(CompressedMatrixBlock m1, 
MatrixBlock ret, AggregateUnaryOperator op) {
-        aggregateUnaryOperations(op, m1.getColGroups(), ret, 0, 
m1.getNumRows(), m1.getNumColumns());
-    }
-
-    private static void divideByNumberOfCellsForMean(CompressedMatrixBlock m1, 
MatrixBlock ret, IndexFunction idxFn) {
-        if(idxFn instanceof ReduceAll)
-            divideByNumberOfCellsForMeanAll(m1, ret);
-        else if(idxFn instanceof ReduceCol)
-            divideByNumberOfCellsForMeanRows(m1, ret);
-        else if(idxFn instanceof ReduceRow)
-            divideByNumberOfCellsForMeanCols(m1, ret);
-    }
-
-    private static void divideByNumberOfCellsForMeanRows(CompressedMatrixBlock 
m1, MatrixBlock ret) {
-        for(int i = 0; i < m1.getNumRows(); i++) {
-            ret.quickSetValue(i, 0, ret.quickGetValue(i, 0) / 
m1.getNumColumns());
-        }
-    }
-
-    private static void divideByNumberOfCellsForMeanCols(CompressedMatrixBlock 
m1, MatrixBlock ret) {
-        for(int i = 0; i < m1.getNumColumns(); i++) {
-            ret.quickSetValue(0, i, ret.quickGetValue(0, i) / m1.getNumRows());
-        }
-    }
-
-    private static void divideByNumberOfCellsForMeanAll(CompressedMatrixBlock 
m1, MatrixBlock ret) {
-        ret.quickSetValue(0, 0, ret.quickGetValue(0, 0) / (m1.getNumColumns() 
* m1.getNumRows()));
-    }
-
-    private static void postProcessAggregate(CompressedMatrixBlock m1, 
MatrixBlock ret, AggregateUnaryOperator op) {
-        if(op.aggOp.increOp.fn instanceof Mean)
-            divideByNumberOfCellsForMean(m1, ret, op.indexFn);
-
-    }
-
-    private static void aggregateUnaryOverlapping(CompressedMatrixBlock m1, 
MatrixBlock ret, AggregateUnaryOperator op,
-        MatrixIndexes indexesIn, boolean inCP) {
-        try {
-            List<Future<MatrixBlock>> rtasks = 
generateUnaryAggregateOverlappingFutures(m1, ret, op);
-            reduceOverlappingFutures(rtasks, ret, op);
-        }
-        catch(InterruptedException | ExecutionException e) {
-            throw new DMLRuntimeException(e);
-        }
-
-    }
-
-    private static void reduceOverlappingFutures(List<Future<MatrixBlock>> 
rtasks, MatrixBlock ret,
-        AggregateUnaryOperator op) throws InterruptedException, 
ExecutionException {
-        if(isReduceAll(ret, op.indexFn))
-            reduceAllOverlappingFutures(rtasks, ret, op);
-        else if(op.indexFn instanceof ReduceRow)
-            reduceColOverlappingFutures(rtasks, ret, op);
-        else
-            reduceRowOverlappingFutures(rtasks, ret, op);
-    }
-
-    private static void reduceColOverlappingFutures(List<Future<MatrixBlock>> 
rtasks, MatrixBlock ret,
-        AggregateUnaryOperator op) throws InterruptedException, 
ExecutionException {
-        for(Future<MatrixBlock> rtask : rtasks) {
-            LibMatrixBincell.bincellOp(rtask.get(),
-                ret,
-                ret,
-                (op.aggOp.increOp.fn instanceof KahanFunction) ? new 
BinaryOperator(
-                    Plus.getPlusFnObject()) : op.aggOp.increOp);
-        }
-    }
-
-    private static void reduceRowOverlappingFutures(List<Future<MatrixBlock>> 
rtasks, MatrixBlock ret,
-        AggregateUnaryOperator op) throws InterruptedException, 
ExecutionException {
-        for(Future<MatrixBlock> rtask : rtasks) {
-            rtask.get();
-        }
-    }
-
-    private static boolean isReduceAll(MatrixBlock ret, IndexFunction idxFn) {
-        return idxFn instanceof ReduceAll || (ret.getNumColumns() == 1 && 
ret.getNumRows() == 1);
-    }
-
-    private static void reduceAllOverlappingFutures(List<Future<MatrixBlock>> 
rtasks, MatrixBlock ret,
-        AggregateUnaryOperator op) throws InterruptedException, 
ExecutionException {
-
-        if(op.aggOp.increOp.fn instanceof KahanFunction) {
-            KahanObject kbuff = new KahanObject(ret.quickGetValue(0, 0), 0);
-            KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
-            for(Future<MatrixBlock> rtask : rtasks) {
-                double tmp = rtask.get().quickGetValue(0, 0);
-
-                kplus.execute2(kbuff, tmp);
-            }
-            ret.quickSetValue(0, 0, kbuff._sum);
-        }
-        else {
-            double val = ret.quickGetValue(0, 0);
-            for(Future<MatrixBlock> rtask : rtasks) {
-                double tmp = rtask.get().quickGetValue(0, 0);
-                val = op.aggOp.increOp.fn.execute(val, tmp);
-
-            }
-            ret.quickSetValue(0, 0, val);
-        }
-    }
-
-    private static List<Future<MatrixBlock>> 
generateUnaryAggregateOverlappingFutures(CompressedMatrixBlock m1,
-        MatrixBlock ret, AggregateUnaryOperator op) throws 
InterruptedException {
-
-        ExecutorService pool = CommonThreadPool.get(op.getNumThreads());
-        ArrayList<UnaryAggregateOverlappingTask> tasks = new ArrayList<>();
-
-        final int blklen = Math.min(m1.getNumRows() / op.getNumThreads(), 
CompressionSettings.BITMAP_BLOCK_SZ);
-
-        for(int i = 0; i * blklen < m1.getNumRows(); i++)
-            tasks.add(new UnaryAggregateOverlappingTask(m1, ret, i * blklen,
-                Math.min((i + 1) * blklen, m1.getNumRows()), op));
-
-        List<Future<MatrixBlock>> futures = pool.invokeAll(tasks);
-        pool.shutdown();
-        return futures;
-    }
-
-    private static List<List<ColGroup>> 
createTaskPartitionNotIncludingUncompressable(List<ColGroup> colGroups, int k) {
-        int numTasks = Math.min(k, colGroups.size());
-        List<List<ColGroup>> grpParts = new ArrayList<>();
-        for(int i = 0; i < numTasks; i++) {
-            grpParts.add(new ArrayList<>());
-        }
-        int pos = 0;
-        for(ColGroup grp : colGroups) {
-            if(!(grp instanceof ColGroupUncompressed)) {
-                List<ColGroup> g = grpParts.get(pos);
-                g.add(grp);
-                pos = (pos + 1) % numTasks;
-            }
-        }
-
-        return grpParts;
-    }
-
-    private static void aggregateUnaryOperations(AggregateUnaryOperator op, 
List<ColGroup> groups, MatrixBlock ret,
-        int rl, int ru, int numColumns) {
-        if(op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof 
Builtin)
-            aggregateUnaryBuiltinRowOperation(op, groups, ret, rl, ru, 
numColumns);
-        else
-            aggregateUnaryNormalOperation(op, groups, ret, rl, ru, numColumns);
-    }
-
-    private static void aggregateUnaryNormalOperation(AggregateUnaryOperator 
op, List<ColGroup> groups, MatrixBlock ret,
-        int rl, int ru, int numColumns) {
-        for(ColGroup grp : groups)
-            grp.unaryAggregateOperations(op, ret, rl, ru);
-
-    }
-
-    private static void 
aggregateUnaryBuiltinRowOperation(AggregateUnaryOperator op, List<ColGroup> 
groups,
-        MatrixBlock ret, int rl, int ru, int numColumns) {
-
-        int[] rnnz = null;
-        int numberDenseColumns = 0;
-        for(ColGroup grp : groups) {
-            grp.unaryAggregateOperations(op, ret, rl, ru);
-            if(grp.isDense())
-                numberDenseColumns += grp.getNumCols();
-            else{
-                if (rnnz == null)
-                    rnnz = new int[ru -  rl];
-                grp.countNonZerosPerRow(rnnz, rl, ru);
-            }
-        }
-        if(rnnz != null)
-            for(int row = rl; row < ru; row++)
-                if(rnnz[row-rl] + numberDenseColumns < numColumns)
-                    ret.quickSetValue(row, 0, 
op.aggOp.increOp.fn.execute(ret.quickGetValue(row, 0), 0.0));
-
-    }
-
-    private static void fillStart(MatrixBlock ret, AggregateUnaryOperator op) {
-        if(op.aggOp.increOp.fn instanceof Builtin) {
-            Double val = null;
-            switch(((Builtin) op.aggOp.increOp.fn).getBuiltinCode()) {
-                case MAX:
-                    val = Double.NEGATIVE_INFINITY;
-                    break;
-                case MIN:
-                    val = Double.POSITIVE_INFINITY;
-                    break;
-                default:
-                    break;
-            }
-            if(val != null) {
-                ret.getDenseBlock().set(val);
-            }
-        }
-    }
-
-    private static class UnaryAggregateTask implements Callable<MatrixBlock> {
-        private final List<ColGroup> _groups;
-        private final int _rl;
-        private final int _ru;
-        private final MatrixBlock _ret;
-        private final int _numColumns;
-        private final AggregateUnaryOperator _op;
-
-        protected UnaryAggregateTask(List<ColGroup> groups, MatrixBlock ret, 
int rl, int ru, AggregateUnaryOperator op,
-            int numColumns) {
-            _groups = groups;
-            _op = op;
-            _rl = rl;
-            _ru = ru;
-            _numColumns = numColumns;
-
-            if(_op.indexFn instanceof ReduceAll) { // sum
-                _ret = new MatrixBlock(1, 1, false);
-                _ret.allocateDenseBlock();
-                if(_op.aggOp.increOp.fn instanceof Builtin)
-                    System.arraycopy(ret.getDenseBlockValues(),
-                        0,
-                        _ret.getDenseBlockValues(),
-                        0,
-                        ret.getNumRows() * ret.getNumColumns());
-            }
-            else // colSums / rowSums
-                _ret = ret;
-
-        }
-
-        protected UnaryAggregateTask(List<ColGroup> groups, MatrixBlock ret, 
int rl, int ru, AggregateUnaryOperator op,
-            int numColumns, boolean overlapping) {
-            _groups = groups;
-            _op = op;
-            _rl = rl;
-            _ru = ru;
-            _numColumns = numColumns;
-
-            if(_op.indexFn instanceof ReduceAll || (_op.indexFn instanceof 
ReduceRow && overlapping)) {
-                _ret = new MatrixBlock(ret.getNumRows(), ret.getNumColumns(), 
false);
-                _ret.allocateDenseBlock();
-                if(_op.aggOp.increOp.fn instanceof Builtin)
-                    System.arraycopy(ret.getDenseBlockValues(),
-                        0,
-                        _ret.getDenseBlockValues(),
-                        0,
-                        ret.getNumRows() * ret.getNumColumns());
-            }
-            else // colSums / rowSums
-                _ret = ret;
-        }
-
-        @Override
-        public MatrixBlock call() {
-            aggregateUnaryOperations(_op, _groups, _ret, _rl, _ru, 
_numColumns);
-            return _ret;
-        }
-    }
-
-    private static class UnaryAggregateOverlappingTask implements 
Callable<MatrixBlock> {
-        private final CompressedMatrixBlock _m1;
-        private final int _rl;
-        private final int _ru;
-        private final MatrixBlock _ret;
-        private final AggregateUnaryOperator _op;
-
-        protected UnaryAggregateOverlappingTask(CompressedMatrixBlock m1, 
MatrixBlock ret, int rl, int ru,
-            AggregateUnaryOperator op) {
-            _m1 = m1;
-            _op = op;
-            _rl = rl;
-            _ru = ru;
-            _ret = ret;
-
-        }
-
-        private MatrixBlock setupOutputMatrix() {
-            MatrixBlock outputBlock;
-            if(_op.indexFn instanceof ReduceAll)
-                outputBlock = new MatrixBlock(_ret.getNumRows(), 
_ret.getNumColumns(), false).allocateDenseBlock();
-            else if(_op.indexFn instanceof ReduceCol)
-                outputBlock = new MatrixBlock(_ru - _rl, _ret.getNumColumns(), 
false).allocateDenseBlock();
-            else
-                outputBlock = new MatrixBlock(_ret.getNumRows(), 
_ret.getNumColumns(), false).allocateDenseBlock();
-
-            if(_op.aggOp.increOp.fn instanceof Builtin)
-                if(_op.indexFn instanceof ReduceCol)
-                    System.arraycopy(_ret.getDenseBlockValues(),
-                        _rl * _ret.getNumColumns(),
-                        outputBlock.getDenseBlockValues(),
-                        0,
-                        outputBlock.getDenseBlockValues().length);
-                else
-                    System.arraycopy(_ret.getDenseBlockValues(),
-                        0,
-                        outputBlock.getDenseBlockValues(),
-                        0,
-                        _ret.getDenseBlockValues().length);
-
-            return outputBlock;
-        }
-
-        private MatrixBlock getTmp() {
-            MatrixBlock tmp = memPool.get();
-            if(tmp == null) {
-                memPool.set(new MatrixBlock(_ru - _rl, _m1.getNumColumns(), 
false, -1).allocateBlock());
-                tmp = memPool.get();
-            }
-            else {
-                tmp = memPool.get();
-                tmp.reset(_ru - _rl, _m1.getNumColumns(), false, -1);
-            }
-            return tmp;
-        }
-
-        private MatrixBlock decompressToTemp() {
-            MatrixBlock tmp = getTmp();
-            for(ColGroup g : _m1.getColGroups())
-                g.decompressToBlockSafe(tmp, _rl, _ru, 0, g.getValues(), 
false);
-            tmp.setNonZeros(_rl + _ru);
-            return tmp;
-        }
-
-        @Override
-        public MatrixBlock call() {
-            MatrixBlock tmp = decompressToTemp();
-
-            MatrixBlock outputBlock = setupOutputMatrix();
-            LibMatrixAgg.aggregateUnaryMatrix(tmp, outputBlock, _op);
-
-            if(_op.indexFn instanceof ReduceCol) {
-                double[] retValues = _ret.getDenseBlockValues();
-                int currentIndex = _rl * _ret.getNumColumns();
-                double[] outputBlockValues = outputBlock.getDenseBlockValues();
-                System.arraycopy(outputBlockValues, 0, retValues, 
currentIndex, outputBlockValues.length);
-
-                return null;
-            }
-            else {
-
-                return outputBlock;
-            }
-        }
-    }
+       // private static final Log LOG = 
LogFactory.getLog(LibCompAgg.class.getName());
+
+       // private static final long MIN_PAR_AGG_THRESHOLD = 8 * 1024 * 1024;
+       private static final long MIN_PAR_AGG_THRESHOLD = 8;
+
+       private static ThreadLocal<MatrixBlock> memPool = new 
ThreadLocal<MatrixBlock>() {
+               @Override
+               protected MatrixBlock initialValue() {
+                       return null;
+               }
+       };
+
+       public static MatrixBlock aggregateUnary(CompressedMatrixBlock 
inputMatrix, MatrixBlock outputMatrix,
+               AggregateUnaryOperator op, int blen, MatrixIndexes indexesIn, 
boolean inCP) {
+               if(inputMatrix.getColGroups() != null) {
+                       fillStart(outputMatrix, op);
+
+                       if(inputMatrix.isOverlapping() &&
+                               (op.aggOp.increOp.fn instanceof KahanPlusSq || 
(op.aggOp.increOp.fn instanceof Builtin &&
+                                       (((Builtin) 
op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
+                                               ((Builtin) 
op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX))))
+                               aggregateUnaryOverlapping(inputMatrix, 
outputMatrix, op, indexesIn, inCP);
+                       else
+                               
aggregateUnaryNormalCompressedMatrixBlock(inputMatrix, outputMatrix, op, blen, 
indexesIn, inCP);
+               }
+
+               if(op.aggOp.existsCorrection() && inCP)
+                       outputMatrix.dropLastRowsOrColumns(op.aggOp.correction);
+
+               outputMatrix.recomputeNonZeros();
+
+               return outputMatrix;
+       }
+
+       private static void 
aggregateUnaryNormalCompressedMatrixBlock(CompressedMatrixBlock inputMatrix,
+               MatrixBlock outputMatrix, AggregateUnaryOperator op, int blen, 
MatrixIndexes indexesIn, boolean inCP) {
+
+               aggregateUncompressedColGroups(inputMatrix, outputMatrix, op);
+
+               if(isValidForParallelProcessing(inputMatrix, op))
+                       tryToAggregateInParallel(inputMatrix, outputMatrix, op);
+               else
+                       aggregateSingleThreaded(inputMatrix, outputMatrix, op);
+
+               postProcessAggregate(inputMatrix, outputMatrix, op);
+
+       }
+
+       private static boolean 
isValidForParallelProcessing(CompressedMatrixBlock m1, AggregateUnaryOperator 
op) {
+               return op.getNumThreads() > 1 && m1.getExactSizeOnDisk() > 
MIN_PAR_AGG_THRESHOLD;
+       }
+
+       private static void 
aggregateUncompressedColGroups(CompressedMatrixBlock m1, MatrixBlock ret,
+               AggregateUnaryOperator op) {
+               ColGroupUncompressed uc = m1.getUncompressedColGroup();
+               if(uc != null)
+                       uc.unaryAggregateOperations(op, ret);
+       }
+
+       private static void tryToAggregateInParallel(CompressedMatrixBlock m1, 
MatrixBlock ret, AggregateUnaryOperator op) {
+               int k = op.getNumThreads();
+               if(k == 1)
+                       aggregateUnaryOperations(op, m1.getColGroups(), ret, 0, 
m1.getNumRows(), m1.getNumColumns());
+               else
+                       aggregateInParallel(m1, ret, op, k);
+
+       }
+
+       private static void aggregateInParallel(CompressedMatrixBlock m1, 
MatrixBlock ret, AggregateUnaryOperator op,
+               int k) {
+
+               ExecutorService pool = CommonThreadPool.get(op.getNumThreads());
+               ArrayList<UnaryAggregateTask> tasks = new ArrayList<>();
+               try {
+                       // compute all compressed column groups
+                       if(op.indexFn instanceof ReduceCol) {
+                               final int blkz = 
CompressionSettings.BITMAP_BLOCK_SZ;
+                               // int blklen = Math.max((int) 
Math.ceil((double) m1.getNumRows() / (op.getNumThreads() * 2)), blkz);
+                               // blklen += (blklen % blkz != 0) ? blkz - 
blklen % blkz : 0;
+                               int blklen = blkz * 4;
+                               for(int i = 0; i * blklen < m1.getNumRows(); 
i++)
+                                       tasks.add(new 
UnaryAggregateTask(m1.getColGroups(), ret, i * blklen,
+                                               Math.min((i + 1) * blklen, 
m1.getNumRows()), op, m1.getNumColumns()));
+
+                       }
+                       else {
+                               List<List<ColGroup>> grpParts = 
createTaskPartitionNotIncludingUncompressable(m1.getColGroups(), k);
+                               for(List<ColGroup> grp : grpParts)
+                                       tasks.add(new UnaryAggregateTask(grp, 
ret, 0, m1.getNumRows(), op, m1.getNumColumns(),
+                                               m1.isOverlapping()));
+                       }
+
+                       List<Future<MatrixBlock>> futures = 
pool.invokeAll(tasks);
+                       pool.shutdown();
+
+                       // aggregate partial results
+                       if(op.indexFn instanceof ReduceAll)
+                               if(op.aggOp.increOp.fn instanceof Builtin)
+                                       aggregateResults(ret, futures, op);
+                               else
+                                       sumResults(ret, futures);
+                       else if(op.indexFn instanceof ReduceRow && 
m1.isOverlapping()) {
+                               if(op.aggOp.increOp.fn instanceof Builtin)
+                                       aggregateResultVectors(ret, futures, 
op);
+                               else
+                                       sumResultVectors(ret, futures);
+                       }
+                       else
+                               for(Future<MatrixBlock> f : futures)
+                                       f.get();
+               }
+               catch(InterruptedException | ExecutionException e) {
+                       throw new DMLRuntimeException(e);
+               }
+       }
+
+       private static void sumResults(MatrixBlock ret, 
List<Future<MatrixBlock>> futures)
+               throws InterruptedException, ExecutionException {
+               double val = ret.quickGetValue(0, 0);
+               for(Future<MatrixBlock> rtask : futures) {
+                       double tmp = rtask.get().quickGetValue(0, 0);
+                       val = val + tmp;
+               }
+               ret.quickSetValue(0, 0, val);
+
+       }
+
+       private static void sumResultVectors(MatrixBlock ret, 
List<Future<MatrixBlock>> futures)
+               throws InterruptedException, ExecutionException {
+
+               double[] retVals = ret.getDenseBlockValues();
+               for(Future<MatrixBlock> rtask : futures) {
+                       double[] taskResult = rtask.get().getDenseBlockValues();
+                       for(int i = 0; i < retVals.length; i++) {
+                               retVals[i] += taskResult[i];
+                       }
+               }
+               ret.setNonZeros(ret.getNumColumns());
+       }
+
+       private static void aggregateResults(MatrixBlock ret, 
List<Future<MatrixBlock>> futures, AggregateUnaryOperator op)
+               throws InterruptedException, ExecutionException {
+               double val = ret.quickGetValue(0, 0);
+               for(Future<MatrixBlock> rtask : futures) {
+                       double tmp = rtask.get().quickGetValue(0, 0);
+                       val = op.aggOp.increOp.fn.execute(val, tmp);
+               }
+               ret.quickSetValue(0, 0, val);
+       }
+
+       private static void aggregateResultVectors(MatrixBlock ret, 
List<Future<MatrixBlock>> futures,
+               AggregateUnaryOperator op) throws InterruptedException, 
ExecutionException {
+               double[] retVals = ret.getDenseBlockValues();
+               for(Future<MatrixBlock> rtask : futures) {
+                       double[] taskResult = rtask.get().getDenseBlockValues();
+                       for(int i = 0; i < retVals.length; i++) {
+                               retVals[i] = 
op.aggOp.increOp.fn.execute(retVals[i] ,  taskResult[i]);
+                       }
+               }
+               ret.setNonZeros(ret.getNumColumns());
+       }
+
+       private static void aggregateSingleThreaded(CompressedMatrixBlock m1, 
MatrixBlock ret, AggregateUnaryOperator op) {
+               aggregateUnaryOperations(op, m1.getColGroups(), ret, 0, 
m1.getNumRows(), m1.getNumColumns());
+       }
+
+       private static void divideByNumberOfCellsForMean(CompressedMatrixBlock 
m1, MatrixBlock ret, IndexFunction idxFn) {
+               if(idxFn instanceof ReduceAll)
+                       divideByNumberOfCellsForMeanAll(m1, ret);
+               else if(idxFn instanceof ReduceCol)
+                       divideByNumberOfCellsForMeanRows(m1, ret);
+               else if(idxFn instanceof ReduceRow)
+                       divideByNumberOfCellsForMeanCols(m1, ret);
+       }
+
+       private static void 
divideByNumberOfCellsForMeanRows(CompressedMatrixBlock m1, MatrixBlock ret) {
+               for(int i = 0; i < m1.getNumRows(); i++) {
+                       ret.quickSetValue(i, 0, ret.quickGetValue(i, 0) / 
m1.getNumColumns());
+               }
+       }
+
+       private static void 
divideByNumberOfCellsForMeanCols(CompressedMatrixBlock m1, MatrixBlock ret) {
+               for(int i = 0; i < m1.getNumColumns(); i++) {
+                       ret.quickSetValue(0, i, ret.quickGetValue(0, i) / 
m1.getNumRows());
+               }
+       }
+
+       private static void 
divideByNumberOfCellsForMeanAll(CompressedMatrixBlock m1, MatrixBlock ret) {
+               ret.quickSetValue(0, 0, ret.quickGetValue(0, 0) / 
(m1.getNumColumns() * m1.getNumRows()));
+       }
+
+       private static void postProcessAggregate(CompressedMatrixBlock m1, 
MatrixBlock ret, AggregateUnaryOperator op) {
+               if(op.aggOp.increOp.fn instanceof Mean)
+                       divideByNumberOfCellsForMean(m1, ret, op.indexFn);
+
+       }
+
+       private static void aggregateUnaryOverlapping(CompressedMatrixBlock m1, 
MatrixBlock ret, AggregateUnaryOperator op,
+               MatrixIndexes indexesIn, boolean inCP) {
+               try {
+                       List<Future<MatrixBlock>> rtasks = 
generateUnaryAggregateOverlappingFutures(m1, ret, op);
+                       reduceOverlappingFutures(rtasks, ret, op);
+               }
+               catch(InterruptedException | ExecutionException e) {
+                       throw new DMLRuntimeException(e);
+               }
+
+       }
+
+       private static void reduceOverlappingFutures(List<Future<MatrixBlock>> 
rtasks, MatrixBlock ret,
+               AggregateUnaryOperator op) throws InterruptedException, 
ExecutionException {
+               if(isReduceAll(ret, op.indexFn))
+                       reduceAllOverlappingFutures(rtasks, ret, op);
+               else if(op.indexFn instanceof ReduceRow)
+                       reduceColOverlappingFutures(rtasks, ret, op);
+               else
+                       reduceRowOverlappingFutures(rtasks, ret, op);
+       }
+
+       private static void 
reduceColOverlappingFutures(List<Future<MatrixBlock>> rtasks, MatrixBlock ret,
+               AggregateUnaryOperator op) throws InterruptedException, 
ExecutionException {
+               for(Future<MatrixBlock> rtask : rtasks) {
+                       LibMatrixBincell.bincellOpInPlace(ret, rtask.get(),
+                               (op.aggOp.increOp.fn instanceof KahanFunction) 
? new BinaryOperator(
+                                       Plus.getPlusFnObject()) : 
op.aggOp.increOp);
+               }
+       }
+
+       private static void 
reduceRowOverlappingFutures(List<Future<MatrixBlock>> rtasks, MatrixBlock ret,
+               AggregateUnaryOperator op) throws InterruptedException, 
ExecutionException {
+               for(Future<MatrixBlock> rtask : rtasks) {
+                       rtask.get();
+               }
+       }
+
+       private static boolean isReduceAll(MatrixBlock ret, IndexFunction 
idxFn) {
+               return idxFn instanceof ReduceAll || (ret.getNumColumns() == 1 
&& ret.getNumRows() == 1);
+       }
+
+       private static void 
reduceAllOverlappingFutures(List<Future<MatrixBlock>> rtasks, MatrixBlock ret,
+               AggregateUnaryOperator op) throws InterruptedException, 
ExecutionException {
+
+               if(op.aggOp.increOp.fn instanceof KahanFunction) {
+                       KahanObject kbuff = new 
KahanObject(ret.quickGetValue(0, 0), 0);
+                       KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+                       for(Future<MatrixBlock> rtask : rtasks) {
+                               double tmp = rtask.get().quickGetValue(0, 0);
+
+                               kplus.execute2(kbuff, tmp);
+                       }
+                       ret.quickSetValue(0, 0, kbuff._sum);
+               }
+               else {
+                       double val = ret.quickGetValue(0, 0);
+                       for(Future<MatrixBlock> rtask : rtasks) {
+                               double tmp = rtask.get().quickGetValue(0, 0);
+                               val = op.aggOp.increOp.fn.execute(val, tmp);
+
+                       }
+                       ret.quickSetValue(0, 0, val);
+               }
+       }
+
+       private static List<Future<MatrixBlock>> 
generateUnaryAggregateOverlappingFutures(CompressedMatrixBlock m1,
+               MatrixBlock ret, AggregateUnaryOperator op) throws 
InterruptedException {
+
+               ExecutorService pool = CommonThreadPool.get(op.getNumThreads());
+               ArrayList<UnaryAggregateOverlappingTask> tasks = new 
ArrayList<>();
+
+               final int blklen = Math.min(m1.getNumRows() / 
op.getNumThreads(), CompressionSettings.BITMAP_BLOCK_SZ);
+
+               for(int i = 0; i * blklen < m1.getNumRows(); i++)
+                       tasks.add(new UnaryAggregateOverlappingTask(m1, ret, i 
* blklen,
+                               Math.min((i + 1) * blklen, m1.getNumRows()), 
op));
+
+               List<Future<MatrixBlock>> futures = pool.invokeAll(tasks);
+               pool.shutdown();
+               return futures;
+       }
+
+       private static List<List<ColGroup>> 
createTaskPartitionNotIncludingUncompressable(List<ColGroup> colGroups, int k) {
+               int numTasks = Math.min(k, colGroups.size());
+               List<List<ColGroup>> grpParts = new ArrayList<>();
+               for(int i = 0; i < numTasks; i++) {
+                       grpParts.add(new ArrayList<>());
+               }
+               int pos = 0;
+               for(ColGroup grp : colGroups) {
+                       if(!(grp instanceof ColGroupUncompressed)) {
+                               List<ColGroup> g = grpParts.get(pos);
+                               g.add(grp);
+                               pos = (pos + 1) % numTasks;
+                       }
+               }
+
+               return grpParts;
+       }
+
+       private static void aggregateUnaryOperations(AggregateUnaryOperator op, 
List<ColGroup> groups, MatrixBlock ret,
+               int rl, int ru, int numColumns) {
+               if(op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn 
instanceof Builtin)
+                       aggregateUnaryBuiltinRowOperation(op, groups, ret, rl, 
ru, numColumns);
+               else
+                       aggregateUnaryNormalOperation(op, groups, ret, rl, ru, 
numColumns);
+       }
+
+       private static void 
aggregateUnaryNormalOperation(AggregateUnaryOperator op, List<ColGroup> groups, 
MatrixBlock ret,
+               int rl, int ru, int numColumns) {
+               for(ColGroup grp : groups)
+                       grp.unaryAggregateOperations(op, ret, rl, ru);
+
+       }
+
+       private static void 
aggregateUnaryBuiltinRowOperation(AggregateUnaryOperator op, List<ColGroup> 
groups,
+               MatrixBlock ret, int rl, int ru, int numColumns) {
+
+               int[] rnnz = null;
+               int numberDenseColumns = 0;
+               for(ColGroup grp : groups) {
+                       grp.unaryAggregateOperations(op, ret, rl, ru);
+                       if(grp.isDense())
+                               numberDenseColumns += grp.getNumCols();
+                       else{
+                               if (rnnz == null)
+                                       rnnz = new int[ru -  rl];
+                               grp.countNonZerosPerRow(rnnz, rl, ru);
+                       }
+               }
+               if(rnnz != null)
+                       for(int row = rl; row < ru; row++)
+                               if(rnnz[row-rl] + numberDenseColumns < 
numColumns)
+                                       ret.quickSetValue(row, 0, 
op.aggOp.increOp.fn.execute(ret.quickGetValue(row, 0), 0.0));
+
+       }
+
+       private static void fillStart(MatrixBlock ret, AggregateUnaryOperator 
op) {
+               if(op.aggOp.increOp.fn instanceof Builtin) {
+                       Double val = null;
+                       switch(((Builtin) 
op.aggOp.increOp.fn).getBuiltinCode()) {
+                               case MAX:
+                                       val = Double.NEGATIVE_INFINITY;
+                                       break;
+                               case MIN:
+                                       val = Double.POSITIVE_INFINITY;
+                                       break;
+                               default:
+                                       break;
+                       }
+                       if(val != null) {
+                               ret.getDenseBlock().set(val);
+                       }
+               }
+       }
+
+       private static class UnaryAggregateTask implements 
Callable<MatrixBlock> {
+               private final List<ColGroup> _groups;
+               private final int _rl;
+               private final int _ru;
+               private final MatrixBlock _ret;
+               private final int _numColumns;
+               private final AggregateUnaryOperator _op;
+
+               protected UnaryAggregateTask(List<ColGroup> groups, MatrixBlock 
ret, int rl, int ru, AggregateUnaryOperator op,
+                       int numColumns) {
+                       _groups = groups;
+                       _op = op;
+                       _rl = rl;
+                       _ru = ru;
+                       _numColumns = numColumns;
+
+                       if(_op.indexFn instanceof ReduceAll) { // sum
+                               _ret = new MatrixBlock(1, 1, false);
+                               _ret.allocateDenseBlock();
+                               if(_op.aggOp.increOp.fn instanceof Builtin)
+                                       
System.arraycopy(ret.getDenseBlockValues(),
+                                               0,
+                                               _ret.getDenseBlockValues(),
+                                               0,
+                                               ret.getNumRows() * 
ret.getNumColumns());
+                       }
+                       else // colSums / rowSums
+                               _ret = ret;
+
+               }
+
+               protected UnaryAggregateTask(List<ColGroup> groups, MatrixBlock 
ret, int rl, int ru, AggregateUnaryOperator op,
+                       int numColumns, boolean overlapping) {
+                       _groups = groups;
+                       _op = op;
+                       _rl = rl;
+                       _ru = ru;
+                       _numColumns = numColumns;
+
+                       if(_op.indexFn instanceof ReduceAll || (_op.indexFn 
instanceof ReduceRow && overlapping)) {
+                               _ret = new MatrixBlock(ret.getNumRows(), 
ret.getNumColumns(), false);
+                               _ret.allocateDenseBlock();
+                               if(_op.aggOp.increOp.fn instanceof Builtin)
+                                       
System.arraycopy(ret.getDenseBlockValues(),
+                                               0,
+                                               _ret.getDenseBlockValues(),
+                                               0,
+                                               ret.getNumRows() * 
ret.getNumColumns());
+                       }
+                       else // colSums / rowSums
+                               _ret = ret;
+               }
+
+               @Override
+               public MatrixBlock call() {
+                       aggregateUnaryOperations(_op, _groups, _ret, _rl, _ru, 
_numColumns);
+                       return _ret;
+               }
+       }
+
+       private static class UnaryAggregateOverlappingTask implements 
Callable<MatrixBlock> {
+               private final CompressedMatrixBlock _m1;
+               private final int _rl;
+               private final int _ru;
+               private final MatrixBlock _ret;
+               private final AggregateUnaryOperator _op;
+
+               protected UnaryAggregateOverlappingTask(CompressedMatrixBlock 
m1, MatrixBlock ret, int rl, int ru,
+                       AggregateUnaryOperator op) {
+                       _m1 = m1;
+                       _op = op;
+                       _rl = rl;
+                       _ru = ru;
+                       _ret = ret;
+
+               }
+
+               private MatrixBlock setupOutputMatrix() {
+                       MatrixBlock outputBlock;
+                       if(_op.indexFn instanceof ReduceAll)
+                               outputBlock = new 
MatrixBlock(_ret.getNumRows(), _ret.getNumColumns(), 
false).allocateDenseBlock();
+                       else if(_op.indexFn instanceof ReduceCol)
+                               outputBlock = new MatrixBlock(_ru - _rl, 
_ret.getNumColumns(), false).allocateDenseBlock();
+                       else
+                               outputBlock = new 
MatrixBlock(_ret.getNumRows(), _ret.getNumColumns(), 
false).allocateDenseBlock();
+
+                       if(_op.aggOp.increOp.fn instanceof Builtin)
+                               if(_op.indexFn instanceof ReduceCol)
+                                       
System.arraycopy(_ret.getDenseBlockValues(),
+                                               _rl * _ret.getNumColumns(),
+                                               
outputBlock.getDenseBlockValues(),
+                                               0,
+                                               
outputBlock.getDenseBlockValues().length);
+                               else
+                                       
System.arraycopy(_ret.getDenseBlockValues(),
+                                               0,
+                                               
outputBlock.getDenseBlockValues(),
+                                               0,
+                                               
_ret.getDenseBlockValues().length);
+
+                       return outputBlock;
+               }
+
+               private MatrixBlock getTmp() {
+                       MatrixBlock tmp = memPool.get();
+                       if(tmp == null) {
+                               memPool.set(new MatrixBlock(_ru - _rl, 
_m1.getNumColumns(), false, -1).allocateBlock());
+                               tmp = memPool.get();
+                       }
+                       else {
+                               tmp = memPool.get();
+                               tmp.reset(_ru - _rl, _m1.getNumColumns(), 
false, -1);
+                       }
+                       return tmp;
+               }
+
+               private MatrixBlock decompressToTemp() {
+                       MatrixBlock tmp = getTmp();
+                       for(ColGroup g : _m1.getColGroups())
+                               g.decompressToBlockSafe(tmp, _rl, _ru, 0, 
g.getValues(), false);
+                       tmp.setNonZeros(_rl + _ru);
+                       return tmp;
+               }
+
+               @Override
+               public MatrixBlock call() {
+                       MatrixBlock tmp = decompressToTemp();
+
+                       MatrixBlock outputBlock = setupOutputMatrix();
+                       LibMatrixAgg.aggregateUnaryMatrix(tmp, outputBlock, 
_op);
+
+                       if(_op.indexFn instanceof ReduceCol) {
+                               double[] retValues = _ret.getDenseBlockValues();
+                               int currentIndex = _rl * _ret.getNumColumns();
+                               double[] outputBlockValues = 
outputBlock.getDenseBlockValues();
+                               System.arraycopy(outputBlockValues, 0, 
retValues, currentIndex, outputBlockValues.length);
+
+                               return null;
+                       }
+                       else {
+
+                               return outputBlock;
+                       }
+               }
+       }
 }

Reply via email to