[SYSTEMML-2237] Performance spark mapmm (lazy-iter) / reshape (sparse) This patch makes a minor performance improvement to spark mapmm operations that require flatmap, by returning a lazy iterator which has the potential to improve memory efficiency and thus reduce unnecessary GC overhead. Furthermore, this also includes some cleanups including spark reshape operations.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/f5d97c55 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/f5d97c55 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/f5d97c55 Branch: refs/heads/master Commit: f5d97c55162bc9c7db57e685c04be9aafe9657f6 Parents: 6aaea2f Author: Matthias Boehm <[email protected]> Authored: Mon Apr 16 23:41:31 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Mon Apr 16 23:41:31 2018 -0700 ---------------------------------------------------------------------- .../mr/AggregateBinaryInstruction.java | 6 +- .../mr/MatrixReshapeMRInstruction.java | 22 +-- .../instructions/spark/CpmmSPInstruction.java | 4 +- .../instructions/spark/MapmmSPInstruction.java | 49 +++--- .../spark/MatrixReshapeSPInstruction.java | 6 +- .../instructions/spark/PMapmmSPInstruction.java | 4 +- .../instructions/spark/RmmSPInstruction.java | 3 +- .../instructions/spark/Tsmm2SPInstruction.java | 4 +- .../instructions/spark/ZipmmSPInstruction.java | 5 +- .../runtime/matrix/data/LibMatrixReorg.java | 151 ++++++++----------- .../matrix/data/OperationsOnMatrixValues.java | 4 +- .../mapred/MMCJMRReducerWithAggregator.java | 8 +- 12 files changed, 110 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/mr/AggregateBinaryInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/mr/AggregateBinaryInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/mr/AggregateBinaryInstruction.java index 2e75c9a..3c4a10c 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/mr/AggregateBinaryInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/mr/AggregateBinaryInstruction.java @@ -154,7 +154,7 @@ public class AggregateBinaryInstruction extends BinaryMRInstructionBase implemen out=cachedValues.holdPlace(output, valueClass); //process instruction - OperationsOnMatrixValues.performAggregateBinary( + OperationsOnMatrixValues.matMult( in1.getIndexes(), (MatrixBlock) in1.getValue(), in2.getIndexes(), (MatrixBlock) in2.getValue(), out.getIndexes(), (MatrixBlock) out.getValue(), @@ -200,7 +200,7 @@ public class AggregateBinaryInstruction extends BinaryMRInstructionBase implemen IndexedMatrixValue out = cachedValues.holdPlace(output, valueClass); //process instruction - OperationsOnMatrixValues.performAggregateBinary(in1.getIndexes(), (MatrixBlock)in1.getValue(), + OperationsOnMatrixValues.matMult(in1.getIndexes(), (MatrixBlock)in1.getValue(), in2BlockIndex, (MatrixBlock) in2BlockValue, out.getIndexes(), (MatrixBlock)out.getValue(), ((AggregateBinaryOperator)optr)); removeOutput &= ( !_outputEmptyBlocks && out.getValue().isEmpty() ); @@ -227,7 +227,7 @@ public class AggregateBinaryInstruction extends BinaryMRInstructionBase implemen IndexedMatrixValue out = cachedValues.holdPlace(output, valueClass); //process instruction - OperationsOnMatrixValues.performAggregateBinary( + OperationsOnMatrixValues.matMult( in1BlockIndex, (MatrixBlock)in1BlockValue, in2.getIndexes(), (MatrixBlock)in2.getValue(), out.getIndexes(), (MatrixBlock)out.getValue(), http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/mr/MatrixReshapeMRInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/mr/MatrixReshapeMRInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/mr/MatrixReshapeMRInstruction.java index fc1e05f..802851c 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/mr/MatrixReshapeMRInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/mr/MatrixReshapeMRInstruction.java @@ -20,6 +20,7 @@ package org.apache.sysml.runtime.instructions.mr; import java.util.ArrayList; +import java.util.List; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.instructions.InstructionUtils; @@ -36,8 +37,6 @@ public class MatrixReshapeMRInstruction extends UnaryInstruction { private MatrixCharacteristics _mcIn = null; private MatrixCharacteristics _mcOut = null; - private ArrayList<IndexedMatrixValue> _cache = null; - private MatrixReshapeMRInstruction(Operator op, byte in, long rows, long cols, boolean byrow, byte out, String istr) { super(MRType.MMTSJ, op, in, out, istr); @@ -46,8 +45,7 @@ public class MatrixReshapeMRInstruction extends UnaryInstruction { _byrow = byrow; } - public void setMatrixCharacteristics( MatrixCharacteristics mcIn, MatrixCharacteristics mcOut ) - { + public void setMatrixCharacteristics( MatrixCharacteristics mcIn, MatrixCharacteristics mcOut ) { _mcIn = mcIn; } @@ -73,25 +71,17 @@ public class MatrixReshapeMRInstruction extends UnaryInstruction { { ArrayList<IndexedMatrixValue> blkList = cachedValues.get(input); if( blkList != null ) - for(IndexedMatrixValue imv : blkList) - { - if( imv == null ) - continue; + for(IndexedMatrixValue imv : blkList) { + if( imv == null ) continue; - //get cached blocks - ArrayList<IndexedMatrixValue> out = _cache; - //process instruction _mcOut.setBlockSize(brlen, bclen); - out = LibMatrixReorg.reshape(imv, _mcIn, out, _mcOut, _byrow, true); + List<IndexedMatrixValue> out = + LibMatrixReorg.reshape(imv, _mcIn, _mcOut, _byrow, true); //put the output values in the output cache for( IndexedMatrixValue outBlk : out ) cachedValues.add(output, outBlk); - - //put blocks into own cache - if( LibMatrixReorg.ALLOW_BLOCK_REUSE ) - _cache = out; } } http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java index de08d83..308e60f 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java @@ -207,7 +207,7 @@ public class CpmmSPInstruction extends BinarySPInstruction { //core block matrix multiplication MatrixBlock blkOut = OperationsOnMatrixValues - .performAggregateBinaryIgnoreIndexes(blkIn1, blkIn2, new MatrixBlock(), _op); + .matMult(blkIn1, blkIn2, new MatrixBlock(), _op); //return target block ixOut.setIndexes(arg0._2()._1().getIndexes().getRowIndex(), @@ -236,7 +236,7 @@ public class CpmmSPInstruction extends BinarySPInstruction { .reorgOperations(_rop, new MatrixBlock(), 0, 0, 0); //core block matrix multiplication return OperationsOnMatrixValues - .performAggregateBinaryIgnoreIndexes(in1, in2, new MatrixBlock(), _op); + .matMult(in1, in2, new MatrixBlock(), _op); } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java index 21c1be3..7c8d606 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java @@ -20,8 +20,8 @@ package org.apache.sysml.runtime.instructions.spark; -import java.util.ArrayList; import java.util.Iterator; +import java.util.stream.IntStream; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -275,8 +275,8 @@ public class MapmmSPInstruction extends BinarySPInstruction { MatrixBlock left = _pbc.getBlock(1, (int)ixIn.getRowIndex()); //execute matrix-vector mult - OperationsOnMatrixValues.performAggregateBinary( - new MatrixIndexes(1,ixIn.getRowIndex()), left, ixIn, blkIn, ixOut, blkOut, _op); + OperationsOnMatrixValues.matMult(new MatrixIndexes(1,ixIn.getRowIndex()), + left, ixIn, blkIn, ixOut, blkOut, _op); } else //if( _type == CacheType.RIGHT ) { @@ -284,8 +284,8 @@ public class MapmmSPInstruction extends BinarySPInstruction { MatrixBlock right = _pbc.getBlock((int)ixIn.getColumnIndex(), 1); //execute matrix-vector mult - OperationsOnMatrixValues.performAggregateBinary( - ixIn, blkIn, new MatrixIndexes(ixIn.getColumnIndex(),1), right, ixOut, blkOut, _op); + OperationsOnMatrixValues.matMult(ixIn, blkIn, + new MatrixIndexes(ixIn.getColumnIndex(),1), right, ixOut, blkOut, _op); } //output new tuple @@ -327,7 +327,7 @@ public class MapmmSPInstruction extends BinarySPInstruction { MatrixBlock left = _pbc.getBlock(1, (int)ixIn.getRowIndex()); //execute matrix-vector mult - return OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes( + return OperationsOnMatrixValues.matMult( left, blkIn, new MatrixBlock(), _op); } else //if( _type == CacheType.RIGHT ) @@ -336,7 +336,7 @@ public class MapmmSPInstruction extends BinarySPInstruction { MatrixBlock right = _pbc.getBlock((int)ixIn.getColumnIndex(), 1); //execute matrix-vector mult - return OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes( + return OperationsOnMatrixValues.matMult( blkIn, right, new MatrixBlock(), _op); } } @@ -392,7 +392,7 @@ public class MapmmSPInstruction extends BinarySPInstruction { MatrixBlock left = _pbc.getBlock(1, (int)ixIn.getRowIndex()); //execute index preserving matrix multiplication - OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes(left, blkIn, blkOut, _op); + OperationsOnMatrixValues.matMult(left, blkIn, blkOut, _op); } else //if( _type == CacheType.RIGHT ) { @@ -400,7 +400,7 @@ public class MapmmSPInstruction extends BinarySPInstruction { MatrixBlock right = _pbc.getBlock((int)ixIn.getColumnIndex(), 1); //execute index preserving matrix multiplication - OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes(blkIn, right, blkOut, _op); + OperationsOnMatrixValues.matMult(blkIn, right, blkOut, _op); } return new Tuple2<>(ixIn, blkOut); @@ -430,32 +430,23 @@ public class MapmmSPInstruction extends BinarySPInstruction { public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) throws Exception { - ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new ArrayList<>(); MatrixIndexes ixIn = arg0._1(); MatrixBlock blkIn = arg0._2(); - + if( _type == CacheType.LEFT ) { - //for all matching left-hand-side blocks - int len = _pbc.getNumRowBlocks(); - for( int i=1; i<=len; i++ ) { - MatrixBlock left = _pbc.getBlock(i, (int)ixIn.getRowIndex()); - MatrixBlock blkOut = OperationsOnMatrixValues - .performAggregateBinaryIgnoreIndexes(left, blkIn, new MatrixBlock(), _op); - ret.add(new Tuple2<>(new MatrixIndexes(i, ixIn.getColumnIndex()), blkOut)); - } + //for all matching left-hand-side blocks, returned as lazy iterator + return IntStream.range(1, _pbc.getNumRowBlocks()+1).mapToObj(i -> + new Tuple2<>(new MatrixIndexes(i, ixIn.getColumnIndex()), + OperationsOnMatrixValues.matMult(_pbc.getBlock(i, (int)ixIn.getRowIndex()), blkIn, + new MatrixBlock(), _op))).iterator(); } else { //RIGHT - //for all matching right-hand-side blocks - int len = _pbc.getNumColumnBlocks(); - for( int j=1; j<=len; j++ ) { - MatrixBlock right = _pbc.getBlock((int)ixIn.getColumnIndex(), j); - MatrixBlock blkOut = OperationsOnMatrixValues - .performAggregateBinaryIgnoreIndexes(blkIn, right, new MatrixBlock(), _op); - ret.add(new Tuple2<>(new MatrixIndexes(ixIn.getRowIndex(), j), blkOut)); - } + //for all matching right-hand-side blocks, returned as lazy iterator + return IntStream.range(1, _pbc.getNumColumnBlocks()+1).mapToObj(j -> + new Tuple2<>(new MatrixIndexes(ixIn.getRowIndex(), j), + OperationsOnMatrixValues.matMult(blkIn, _pbc.getBlock((int)ixIn.getColumnIndex(), j), + new MatrixBlock(), _op))).iterator(); } - - return ret.iterator(); } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java index 8a1c325..97f112c 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java @@ -19,8 +19,8 @@ package org.apache.sysml.runtime.instructions.spark; -import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.PairFlatMapFunction; @@ -138,8 +138,8 @@ public class MatrixReshapeSPInstruction extends UnarySPInstruction IndexedMatrixValue in = SparkUtils.toIndexedMatrixBlock(arg0); //execute actual reshape operation - ArrayList<IndexedMatrixValue> out = LibMatrixReorg - .reshape(in, _mcIn, new ArrayList<>(), _mcOut, _byrow, _outputEmptyBlocks); + List<IndexedMatrixValue> out = LibMatrixReorg + .reshape(in, _mcIn, _mcOut, _byrow, _outputEmptyBlocks); //output conversion (for compatibility w/ rdd schema) return SparkUtils.fromIndexedMatrixBlock(out).iterator(); http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java index 2e7ac11..1b6435b 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java @@ -193,8 +193,8 @@ public class PMapmmSPInstruction extends BinarySPInstruction { MatrixBlock left = pm.getBlock(i, (int)ixIn.getRowIndex()); //execute matrix-vector mult - OperationsOnMatrixValues.performAggregateBinary( - new MatrixIndexes(i,ixIn.getRowIndex()), left, ixIn, blkIn, ixOut, blkOut, _op); + OperationsOnMatrixValues.matMult(new MatrixIndexes(i,ixIn.getRowIndex()), + left, ixIn, blkIn, ixOut, blkOut, _op); //output new tuple ixOut.setIndexes(_offset+i, ixOut.getColumnIndex()); http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java index 294c142..90e5396 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java @@ -191,8 +191,7 @@ public class RmmSPInstruction extends BinarySPInstruction { MatrixBlock blkIn2 = arg0._2()._2(); //core block matrix multiplication - MatrixBlock blkOut = OperationsOnMatrixValues - .performAggregateBinaryIgnoreIndexes(blkIn1, blkIn2, new MatrixBlock(), _op); + MatrixBlock blkOut = OperationsOnMatrixValues.matMult(blkIn1, blkIn2, new MatrixBlock(), _op); //output new tuple return new Tuple2<>(ixOut, blkOut); http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java index cabc2c8..5bb686b 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java @@ -154,7 +154,7 @@ public class Tsmm2SPInstruction extends UnarySPInstruction { (int)(_type.isLeft()?1:ixin.getColumnIndex())); MatrixBlock mbin2t = transpose(mbin2, new MatrixBlock()); //prep for transpose rewrite mm - MatrixBlock out2 = (MatrixBlock) OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes( //mm + MatrixBlock out2 = (MatrixBlock) OperationsOnMatrixValues.matMult( //mm _type.isLeft() ? mbin2t : mbin, _type.isLeft() ? mbin : mbin2t, new MatrixBlock(), _op); MatrixIndexes ixout2 = _type.isLeft() ? new MatrixIndexes(2,1) : new MatrixIndexes(1,2); ret.add(new Tuple2<>(ixout2, out2)); @@ -215,7 +215,7 @@ public class Tsmm2SPInstruction extends UnarySPInstruction { (int)(_type.isLeft()?1:ixin.getColumnIndex())); MatrixBlock mbin2t = transpose(mbin2, new MatrixBlock()); //prep for transpose rewrite mm - MatrixBlock out2 = OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes( //mm + MatrixBlock out2 = OperationsOnMatrixValues.matMult( //mm _type.isLeft() ? mbin2t : mbin, _type.isLeft() ? mbin : mbin2t, new MatrixBlock(), _op); MatrixIndexes ixout2 = _type.isLeft() ? new MatrixIndexes(2,1) : new MatrixIndexes(1,2); http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java index 4f168c1..42313fa 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java @@ -123,10 +123,9 @@ public class ZipmmSPInstruction extends BinarySPInstruction { //transpose right input (for vectors no-op) MatrixBlock tmp = (MatrixBlock)in2.reorgOperations(_rop, new MatrixBlock(), 0, 0, 0); - + //core matrix multiplication (for t(y)%*%X or t(X)%*%y) - return OperationsOnMatrixValues - .performAggregateBinaryIgnoreIndexes(tmp, in1, new MatrixBlock(), _abop); + return OperationsOnMatrixValues.matMult(tmp, in1, new MatrixBlock(), _abop); } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java index 8de6f13..78b730c 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java @@ -28,10 +28,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map.Entry; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.stream.Collectors; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; @@ -69,9 +70,6 @@ public class LibMatrixReorg //safe due to copy-on-write and safe update-in-place handling) public static final boolean SHALLOW_COPY_REORG = true; - //allow reuse of temporary blocks for certain operations - public static final boolean ALLOW_BLOCK_REUSE = false; - //use csr instead of mcsr sparse block for rexpand columns / diag v2m public static final boolean SPARSE_OUTPUTS_IN_CSR = true; @@ -465,15 +463,15 @@ public class LibMatrixReorg * @param outputEmptyBlocks output blocks with nnz=0 * @return list of indexed matrix values */ - public static ArrayList<IndexedMatrixValue> reshape( IndexedMatrixValue in, MatrixCharacteristics mcIn, - ArrayList<IndexedMatrixValue> out, MatrixCharacteristics mcOut, boolean rowwise, boolean outputEmptyBlocks ) { + public static List<IndexedMatrixValue> reshape(IndexedMatrixValue in, MatrixCharacteristics mcIn, + MatrixCharacteristics mcOut, boolean rowwise, boolean outputEmptyBlocks ) { //prepare inputs MatrixIndexes ixIn = in.getIndexes(); MatrixBlock mbIn = (MatrixBlock) in.getValue(); //prepare result blocks (no reuse in order to guarantee mem constraints) Collection<MatrixIndexes> rix = computeAllResultBlockIndexes(ixIn, mcIn, mcOut, mbIn, rowwise, outputEmptyBlocks); - HashMap<MatrixIndexes, MatrixBlock> rblk = createAllResultBlocks(rix, mbIn.nonZeros, mcIn, mcOut, rowwise, out); + Map<MatrixIndexes, MatrixBlock> rblk = createAllResultBlocks(rix, mbIn.nonZeros, mcOut); //basic algorithm long row_offset = (ixIn.getRowIndex()-1)*mcIn.getRowsPerBlock(); @@ -483,15 +481,11 @@ public class LibMatrixReorg else //dense reshapeDense(mbIn, row_offset, col_offset, rblk, mcIn, mcOut, rowwise); - //prepare output - out = new ArrayList<>(); - for( Entry<MatrixIndexes, MatrixBlock> e : rblk.entrySet() ) - if( outputEmptyBlocks || !e.getValue().isEmptyBlock(false) ) { - e.getValue().examSparsity(); //ensure correct format - out.add(new IndexedMatrixValue(e.getKey(),e.getValue())); - } - - return out; + //prepare output (sparsity switch, wrapper) + return rblk.entrySet().stream() + .filter( e -> outputEmptyBlocks || !e.getValue().isEmptyBlock(false)) + .map(e -> {e.getValue().examSparsity(); return new IndexedMatrixValue(e.getKey(),e.getValue());}) + .collect(Collectors.toList()); } /** @@ -1552,46 +1546,26 @@ public class LibMatrixReorg row_offset+cell.getI(), col_offset+cell.getJ(), mcIn, mcOut, rowwise)); } } - - @SuppressWarnings("unused") - private static HashMap<MatrixIndexes, MatrixBlock> createAllResultBlocks( Collection<MatrixIndexes> rix, - long nnz, MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, - boolean rowwise, ArrayList<IndexedMatrixValue> reuse ) - { - HashMap<MatrixIndexes, MatrixBlock> ret = new HashMap<MatrixIndexes,MatrixBlock>(); - long nBlocks = rix.size(); - int count = 0; - - for( MatrixIndexes ix : rix ) - { - //compute indexes - long bi = ix.getRowIndex(); - long bj = ix.getColumnIndex(); - int lbrlen = UtilFunctions.computeBlockSize(mcOut.getRows(), bi, mcOut.getRowsPerBlock()); - int lbclen = UtilFunctions.computeBlockSize(mcOut.getCols(), bj, mcOut.getColsPerBlock()); - - //create result block - int estnnz = (int) (nnz/nBlocks); //force initialcapacity per row to 1, for many blocks - boolean sparse = MatrixBlock.evalSparseFormatInMemory(lbrlen, lbclen, estnnz); - MatrixBlock block = null; - if( ALLOW_BLOCK_REUSE && reuse!=null && !reuse.isEmpty()) { - block = (MatrixBlock) reuse.get(count++).getValue(); - block.reset(lbrlen, lbclen, sparse, estnnz); - } - else - block = new MatrixBlock(lbrlen, lbclen, sparse, estnnz); - - if( lbrlen<1 || lbclen<1 ) - throw new RuntimeException("Computed block dimensions ("+bi+","+bj+" -> "+lbrlen+","+lbclen+") are invalid!"); - - ret.put(ix, block); - } - - return ret; + + private static Map<MatrixIndexes, MatrixBlock> createAllResultBlocks(Collection<MatrixIndexes> rix, long nnz, MatrixCharacteristics mcOut) { + return rix.stream().collect(Collectors.toMap(ix -> ix, ix -> createResultBlock(ix, nnz, rix.size(), mcOut))); + } + + private static MatrixBlock createResultBlock(MatrixIndexes ix, long nnz, int nBlocks, MatrixCharacteristics mcOut) { + //compute indexes + long bi = ix.getRowIndex(); + long bj = ix.getColumnIndex(); + int lbrlen = UtilFunctions.computeBlockSize(mcOut.getRows(), bi, mcOut.getRowsPerBlock()); + int lbclen = UtilFunctions.computeBlockSize(mcOut.getCols(), bj, mcOut.getColsPerBlock()); + if( lbrlen<1 || lbclen<1 ) + throw new DMLRuntimeException("Computed block dimensions ("+bi+","+bj+" -> "+lbrlen+","+lbclen+") are invalid!"); + //create result block + int estnnz = (int) (nnz/nBlocks); //force initial capacity per row to 1, for many blocks + boolean sparse = MatrixBlock.evalSparseFormatInMemory(lbrlen, lbclen, estnnz); + return new MatrixBlock(lbrlen, lbclen, sparse, estnnz); } - private static void reshapeDense( MatrixBlock in, long row_offset, long col_offset, - HashMap<MatrixIndexes,MatrixBlock> rix, + private static void reshapeDense( MatrixBlock in, long row_offset, long col_offset, Map<MatrixIndexes,MatrixBlock> rix, MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, boolean rowwise ) { if( in.isEmptyBlock(false) ) return; @@ -1622,14 +1596,12 @@ public class LibMatrixReorg //cleanup for sparse blocks if( !rowwise && mcIn.getRows() > 1 ) { - for( MatrixBlock block : rix.values() ) - if( block.sparse ) - block.sortSparseRows(); + rix.values().stream().filter(b -> b.sparse) + .forEach(b -> b.sortSparseRows()); } } - private static void reshapeSparse( MatrixBlock in, long row_offset, long col_offset, - HashMap<MatrixIndexes,MatrixBlock> rix, + private static void reshapeSparse( MatrixBlock in, long row_offset, long col_offset, Map<MatrixIndexes,MatrixBlock> rix, MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, boolean rowwise ) { if( in.isEmptyBlock(false) ) return; @@ -1639,35 +1611,36 @@ public class LibMatrixReorg //append all values to right blocks MatrixIndexes ixtmp = new MatrixIndexes(); - for( int i=0; i<rlen; i++ ) - { - if( !a.isEmpty(i) ) { - long ai = row_offset+i; - int apos = a.pos(i); - int alen = a.size(i); - int[] aix = a.indexes(i); - double[] avals = a.values(i); - - for( int j=apos; j<apos+alen; j++ ) { - long aj = col_offset+aix[j]; - ixtmp = computeResultBlockIndex(ixtmp, ai, aj, mcIn, mcOut, rowwise); - MatrixBlock out = rix.get(ixtmp); - if( out == null ) - throw new DMLRuntimeException("Missing result block: "+ixtmp); - ixtmp = computeInBlockIndex(ixtmp, ai, aj, mcIn, mcOut, rowwise); - out.appendValue((int)ixtmp.getRowIndex(),(int)ixtmp.getColumnIndex(), avals[j]); - } + for( int i=0; i<rlen; i++ ) { + if( a.isEmpty(i) ) continue; + long ai = row_offset+i; + int apos = a.pos(i); + int alen = a.size(i); + int[] aix = a.indexes(i); + double[] avals = a.values(i); + for( int j=apos; j<apos+alen; j++ ) { + long aj = col_offset+aix[j]; + ixtmp = computeResultBlockIndex(ixtmp, ai, aj, mcIn, mcOut, rowwise); + MatrixBlock out = getAllocatedBlock(rix, ixtmp); + ixtmp = computeInBlockIndex(ixtmp, ai, aj, mcIn, mcOut, rowwise); + out.appendValue((int)ixtmp.getRowIndex(),(int)ixtmp.getColumnIndex(), avals[j]); } } //cleanup for sparse blocks if( !rowwise && mcIn.getRows() > 1 ) { - for( MatrixBlock block : rix.values() ) - if( block.sparse ) - block.sortSparseRows(); + rix.values().stream().filter(b -> b.sparse) + .forEach(b -> b.sortSparseRows()); } } + private static MatrixBlock getAllocatedBlock(Map<MatrixIndexes,MatrixBlock> rix, MatrixIndexes ix) { + MatrixBlock out = rix.get(ix); + if( out == null ) + throw new DMLRuntimeException("Missing result block: "+ix); + return out; + } + /** * Assumes internal (0-begin) indices ai, aj as input; computes external block indexes (1-begin) * @@ -1682,25 +1655,27 @@ public class LibMatrixReorg private static MatrixIndexes computeResultBlockIndex( MatrixIndexes ixout, long ai, long aj, MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, boolean rowwise ) { - long tempc = rowwise ? ai*mcIn.getCols()+aj : ai+mcIn.getRows()*aj; + long tempc = computeGlobalCellIndex(mcIn, ai, aj, rowwise); long ci = rowwise ? tempc/mcOut.getCols() : tempc%mcOut.getRows(); long cj = rowwise ? tempc%mcOut.getCols() : tempc/mcOut.getRows(); long bci = ci/mcOut.getRowsPerBlock() + 1; - long bcj = cj/mcOut.getColsPerBlock() + 1; - return (ixout != null) ? ixout.setIndexes(bci, bcj) : - new MatrixIndexes(bci, bcj); + long bcj = cj/mcOut.getColsPerBlock() + 1; + return ixout.setIndexes(bci, bcj); } - + private static MatrixIndexes computeInBlockIndex( MatrixIndexes ixout, long ai, long aj, MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, boolean rowwise ) { - long tempc = rowwise ? ai*mcIn.getCols()+aj : ai+mcIn.getRows()*aj; + long tempc = computeGlobalCellIndex(mcIn, ai, aj, rowwise); long ci = rowwise ? (tempc/mcOut.getCols())%mcOut.getRowsPerBlock() : (tempc%mcOut.getRows())%mcOut.getRowsPerBlock(); long cj = rowwise ? (tempc%mcOut.getCols())%mcOut.getColsPerBlock() : (tempc/mcOut.getRows())%mcOut.getColsPerBlock(); - return (ixout != null) ? ixout.setIndexes(ci, cj) : - new MatrixIndexes(ci, cj); + return ixout.setIndexes(ci, cj); + } + + private static long computeGlobalCellIndex(MatrixCharacteristics mcIn, long ai, long aj, boolean rowwise) { + return rowwise ? ai*mcIn.getCols()+aj : ai+mcIn.getRows()*aj; } private static MatrixBlock removeEmptyRows(MatrixBlock in, MatrixBlock ret, MatrixBlock select, boolean emptyReturn) { http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java index 1e1c003..0e77b8e 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java @@ -218,7 +218,7 @@ public class OperationsOnMatrixValues valueIn.aggregateUnaryOperations(op, valueOut, brlen, bclen, indexesIn); } - public static MatrixBlock performAggregateBinary(MatrixIndexes indexes1, MatrixBlock value1, MatrixIndexes indexes2, MatrixBlock value2, + public static MatrixBlock matMult(MatrixIndexes indexes1, MatrixBlock value1, MatrixIndexes indexes2, MatrixBlock value2, MatrixIndexes indexesOut, MatrixBlock valueOut, AggregateBinaryOperator op) { //compute output index indexesOut.setIndexes(indexes1.getRowIndex(), indexes2.getColumnIndex()); @@ -229,7 +229,7 @@ public class OperationsOnMatrixValues return value1.aggregateBinaryOperations(indexes1, value1, indexes2, value2, valueOut, op); } - public static MatrixBlock performAggregateBinaryIgnoreIndexes(MatrixBlock value1, MatrixBlock value2, + public static MatrixBlock matMult(MatrixBlock value1, MatrixBlock value2, MatrixBlock valueOut, AggregateBinaryOperator op) { //perform on the value if( value2 instanceof CompressedMatrixBlock ) http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRReducerWithAggregator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRReducerWithAggregator.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRReducerWithAggregator.java index a9e1714..3d11062 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRReducerWithAggregator.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRReducerWithAggregator.java @@ -123,15 +123,15 @@ public class MMCJMRReducerWithAggregator extends MMCJMRCombinerReducerBase { //perform matrix multiplication indexesbuffer.setIndexes(tmp.getKey().getRowIndex(), inIndex); - OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes((MatrixBlock)tmp.getValue(), - (MatrixBlock)inValue, (MatrixBlock)valueBuffer, (AggregateBinaryOperator)aggBinInstruction.getOperator()); + OperationsOnMatrixValues.matMult((MatrixBlock)tmp.getValue(), (MatrixBlock)inValue, + (MatrixBlock)valueBuffer, (AggregateBinaryOperator)aggBinInstruction.getOperator()); } else //right cached { //perform matrix multiplication indexesbuffer.setIndexes(inIndex, tmp.getKey().getColumnIndex()); - OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes((MatrixBlock)inValue, - (MatrixBlock)tmp.getValue(), (MatrixBlock)valueBuffer, (AggregateBinaryOperator)aggBinInstruction.getOperator()); + OperationsOnMatrixValues.matMult((MatrixBlock)inValue, (MatrixBlock)tmp.getValue(), + (MatrixBlock)valueBuffer, (AggregateBinaryOperator)aggBinInstruction.getOperator()); } //aggregate block to output buffer or direct output
