[SYSTEMML-2237] Performance spark mapmm (lazy-iter) / reshape (sparse)

This patch makes a minor performance improvement to spark mapmm
operations that require flatmap, by returning a lazy iterator which has
the potential to improve memory efficiency and thus reduce unnecessary
GC overhead. Furthermore, this also includes some cleanups including
spark reshape operations.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/f5d97c55
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/f5d97c55
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/f5d97c55

Branch: refs/heads/master
Commit: f5d97c55162bc9c7db57e685c04be9aafe9657f6
Parents: 6aaea2f
Author: Matthias Boehm <[email protected]>
Authored: Mon Apr 16 23:41:31 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Mon Apr 16 23:41:31 2018 -0700

----------------------------------------------------------------------
 .../mr/AggregateBinaryInstruction.java          |   6 +-
 .../mr/MatrixReshapeMRInstruction.java          |  22 +--
 .../instructions/spark/CpmmSPInstruction.java   |   4 +-
 .../instructions/spark/MapmmSPInstruction.java  |  49 +++---
 .../spark/MatrixReshapeSPInstruction.java       |   6 +-
 .../instructions/spark/PMapmmSPInstruction.java |   4 +-
 .../instructions/spark/RmmSPInstruction.java    |   3 +-
 .../instructions/spark/Tsmm2SPInstruction.java  |   4 +-
 .../instructions/spark/ZipmmSPInstruction.java  |   5 +-
 .../runtime/matrix/data/LibMatrixReorg.java     | 151 ++++++++-----------
 .../matrix/data/OperationsOnMatrixValues.java   |   4 +-
 .../mapred/MMCJMRReducerWithAggregator.java     |   8 +-
 12 files changed, 110 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/mr/AggregateBinaryInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/mr/AggregateBinaryInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/mr/AggregateBinaryInstruction.java
index 2e75c9a..3c4a10c 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/mr/AggregateBinaryInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/mr/AggregateBinaryInstruction.java
@@ -154,7 +154,7 @@ public class AggregateBinaryInstruction extends 
BinaryMRInstructionBase implemen
                                out=cachedValues.holdPlace(output, valueClass);
 
                        //process instruction
-                       OperationsOnMatrixValues.performAggregateBinary(
+                       OperationsOnMatrixValues.matMult(
                                in1.getIndexes(), (MatrixBlock) in1.getValue(),
                                in2.getIndexes(), (MatrixBlock) in2.getValue(),
                                out.getIndexes(), (MatrixBlock) out.getValue(),
@@ -200,7 +200,7 @@ public class AggregateBinaryInstruction extends 
BinaryMRInstructionBase implemen
                                IndexedMatrixValue out = 
cachedValues.holdPlace(output, valueClass);
                                
                                //process instruction
-                               
OperationsOnMatrixValues.performAggregateBinary(in1.getIndexes(), 
(MatrixBlock)in1.getValue(), 
+                               
OperationsOnMatrixValues.matMult(in1.getIndexes(), (MatrixBlock)in1.getValue(), 
                                        in2BlockIndex, (MatrixBlock) 
in2BlockValue, out.getIndexes(), (MatrixBlock)out.getValue(), 
                                        ((AggregateBinaryOperator)optr));
                                removeOutput &= ( !_outputEmptyBlocks && 
out.getValue().isEmpty() );
@@ -227,7 +227,7 @@ public class AggregateBinaryInstruction extends 
BinaryMRInstructionBase implemen
                                IndexedMatrixValue out = 
cachedValues.holdPlace(output, valueClass);
                                
                                //process instruction
-                               OperationsOnMatrixValues.performAggregateBinary(
+                               OperationsOnMatrixValues.matMult(
                                        in1BlockIndex, 
(MatrixBlock)in1BlockValue,
                                        in2.getIndexes(), 
(MatrixBlock)in2.getValue(),
                                        out.getIndexes(), 
(MatrixBlock)out.getValue(),

http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/mr/MatrixReshapeMRInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/mr/MatrixReshapeMRInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/mr/MatrixReshapeMRInstruction.java
index fc1e05f..802851c 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/mr/MatrixReshapeMRInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/mr/MatrixReshapeMRInstruction.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.instructions.mr;
 
 import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
@@ -36,8 +37,6 @@ public class MatrixReshapeMRInstruction extends 
UnaryInstruction {
        private MatrixCharacteristics _mcIn = null;
        private MatrixCharacteristics _mcOut = null;
 
-       private ArrayList<IndexedMatrixValue> _cache = null;
-
        private MatrixReshapeMRInstruction(Operator op, byte in, long rows, 
long cols, boolean byrow, byte out,
                        String istr) {
                super(MRType.MMTSJ, op, in, out, istr);
@@ -46,8 +45,7 @@ public class MatrixReshapeMRInstruction extends 
UnaryInstruction {
                _byrow = byrow;
        }
 
-       public void setMatrixCharacteristics( MatrixCharacteristics mcIn, 
MatrixCharacteristics mcOut )
-       {
+       public void setMatrixCharacteristics( MatrixCharacteristics mcIn, 
MatrixCharacteristics mcOut ) {
                _mcIn = mcIn;
        }
 
@@ -73,25 +71,17 @@ public class MatrixReshapeMRInstruction extends 
UnaryInstruction {
        {
                ArrayList<IndexedMatrixValue> blkList = cachedValues.get(input);
                if( blkList != null )
-                       for(IndexedMatrixValue imv : blkList)
-                       {
-                               if( imv == null )
-                                       continue;
+                       for(IndexedMatrixValue imv : blkList) {
+                               if( imv == null ) continue;
                                
-                               //get cached blocks
-                               ArrayList<IndexedMatrixValue> out = _cache;
-       
                                //process instruction
                                _mcOut.setBlockSize(brlen, bclen);
-                               out = LibMatrixReorg.reshape(imv, _mcIn, out, 
_mcOut, _byrow, true);
+                               List<IndexedMatrixValue> out =
+                                       LibMatrixReorg.reshape(imv, _mcIn, 
_mcOut, _byrow, true);
                                
                                //put the output values in the output cache
                                for( IndexedMatrixValue outBlk : out )
                                        cachedValues.add(output, outBlk);
-                               
-                               //put blocks into own cache
-                               if( LibMatrixReorg.ALLOW_BLOCK_REUSE )
-                                       _cache = out;   
                        }
        }
        

http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
index de08d83..308e60f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
@@ -207,7 +207,7 @@ public class CpmmSPInstruction extends BinarySPInstruction {
                        
                        //core block matrix multiplication 
                        MatrixBlock blkOut = OperationsOnMatrixValues
-                               .performAggregateBinaryIgnoreIndexes(blkIn1, 
blkIn2, new MatrixBlock(), _op);
+                               .matMult(blkIn1, blkIn2, new MatrixBlock(), 
_op);
                        
                        //return target block
                        
ixOut.setIndexes(arg0._2()._1().getIndexes().getRowIndex(),
@@ -236,7 +236,7 @@ public class CpmmSPInstruction extends BinarySPInstruction {
                                .reorgOperations(_rop, new MatrixBlock(), 0, 0, 
0);
                        //core block matrix multiplication
                        return OperationsOnMatrixValues
-                               .performAggregateBinaryIgnoreIndexes(in1, in2, 
new MatrixBlock(), _op);
+                               .matMult(in1, in2, new MatrixBlock(), _op);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
index 21c1be3..7c8d606 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
@@ -20,8 +20,8 @@
 package org.apache.sysml.runtime.instructions.spark;
 
 
-import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.stream.IntStream;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
@@ -275,8 +275,8 @@ public class MapmmSPInstruction extends BinarySPInstruction 
{
                                MatrixBlock left = _pbc.getBlock(1, 
(int)ixIn.getRowIndex());
                                
                                //execute matrix-vector mult
-                               
OperationsOnMatrixValues.performAggregateBinary( 
-                                               new 
MatrixIndexes(1,ixIn.getRowIndex()), left, ixIn, blkIn, ixOut, blkOut, _op);
+                               OperationsOnMatrixValues.matMult(new 
MatrixIndexes(1,ixIn.getRowIndex()),
+                                       left, ixIn, blkIn, ixOut, blkOut, _op);
                        }
                        else //if( _type == CacheType.RIGHT )
                        {
@@ -284,8 +284,8 @@ public class MapmmSPInstruction extends BinarySPInstruction 
{
                                MatrixBlock right = 
_pbc.getBlock((int)ixIn.getColumnIndex(), 1);
                                
                                //execute matrix-vector mult
-                               OperationsOnMatrixValues.performAggregateBinary(
-                                               ixIn, blkIn, new 
MatrixIndexes(ixIn.getColumnIndex(),1), right, ixOut, blkOut, _op);
+                               OperationsOnMatrixValues.matMult(ixIn, blkIn,
+                                       new 
MatrixIndexes(ixIn.getColumnIndex(),1), right, ixOut, blkOut, _op);
                        }
                        
                        //output new tuple
@@ -327,7 +327,7 @@ public class MapmmSPInstruction extends BinarySPInstruction 
{
                                MatrixBlock left = _pbc.getBlock(1, 
(int)ixIn.getRowIndex());
                                
                                //execute matrix-vector mult
-                               return 
OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes( 
+                               return OperationsOnMatrixValues.matMult( 
                                        left, blkIn, new MatrixBlock(), _op);
                        }
                        else //if( _type == CacheType.RIGHT )
@@ -336,7 +336,7 @@ public class MapmmSPInstruction extends BinarySPInstruction 
{
                                MatrixBlock right = 
_pbc.getBlock((int)ixIn.getColumnIndex(), 1);
                                
                                //execute matrix-vector mult
-                               return 
OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes(
+                               return OperationsOnMatrixValues.matMult(
                                        blkIn, right, new MatrixBlock(), _op);
                        }
                }
@@ -392,7 +392,7 @@ public class MapmmSPInstruction extends BinarySPInstruction 
{
                                        MatrixBlock left = _pbc.getBlock(1, 
(int)ixIn.getRowIndex());
                                        
                                        //execute index preserving matrix 
multiplication
-                                       
OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes(left, blkIn, 
blkOut, _op);
+                                       OperationsOnMatrixValues.matMult(left, 
blkIn, blkOut, _op);
                                }
                                else //if( _type == CacheType.RIGHT )
                                {
@@ -400,7 +400,7 @@ public class MapmmSPInstruction extends BinarySPInstruction 
{
                                        MatrixBlock right = 
_pbc.getBlock((int)ixIn.getColumnIndex(), 1);
 
                                        //execute index preserving matrix 
multiplication
-                                       
OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes(blkIn, right, 
blkOut, _op);
+                                       OperationsOnMatrixValues.matMult(blkIn, 
right, blkOut, _op);
                                }
                        
                                return new Tuple2<>(ixIn, blkOut);
@@ -430,32 +430,23 @@ public class MapmmSPInstruction extends 
BinarySPInstruction {
                public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
                        throws Exception 
                {
-                       ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new 
ArrayList<>();
                        MatrixIndexes ixIn = arg0._1();
                        MatrixBlock blkIn = arg0._2();
-
+                       
                        if( _type == CacheType.LEFT ) {
-                               //for all matching left-hand-side blocks
-                               int len = _pbc.getNumRowBlocks();
-                               for( int i=1; i<=len; i++ ) {
-                                       MatrixBlock left = _pbc.getBlock(i, 
(int)ixIn.getRowIndex());
-                                       MatrixBlock blkOut = 
OperationsOnMatrixValues
-                                               
.performAggregateBinaryIgnoreIndexes(left, blkIn, new MatrixBlock(), _op);
-                                       ret.add(new Tuple2<>(new 
MatrixIndexes(i, ixIn.getColumnIndex()), blkOut));
-                               }
+                               //for all matching left-hand-side blocks, 
returned as lazy iterator
+                               return IntStream.range(1, 
_pbc.getNumRowBlocks()+1).mapToObj(i ->
+                                       new Tuple2<>(new MatrixIndexes(i, 
ixIn.getColumnIndex()),
+                                       
OperationsOnMatrixValues.matMult(_pbc.getBlock(i, (int)ixIn.getRowIndex()), 
blkIn,
+                                               new MatrixBlock(), 
_op))).iterator();
                        }
                        else { //RIGHT
-                               //for all matching right-hand-side blocks
-                               int len = _pbc.getNumColumnBlocks();
-                               for( int j=1; j<=len; j++ )  {
-                                       MatrixBlock right = 
_pbc.getBlock((int)ixIn.getColumnIndex(), j);
-                                       MatrixBlock blkOut = 
OperationsOnMatrixValues
-                                               
.performAggregateBinaryIgnoreIndexes(blkIn, right, new MatrixBlock(), _op);
-                                       ret.add(new Tuple2<>(new 
MatrixIndexes(ixIn.getRowIndex(), j), blkOut));
-                               }
+                               //for all matching right-hand-side blocks, 
returned as lazy iterator
+                               return IntStream.range(1, 
_pbc.getNumColumnBlocks()+1).mapToObj(j ->
+                                       new Tuple2<>(new 
MatrixIndexes(ixIn.getRowIndex(), j),
+                                       OperationsOnMatrixValues.matMult(blkIn, 
_pbc.getBlock((int)ixIn.getColumnIndex(), j),
+                                               new MatrixBlock(), 
_op))).iterator();
                        }
-                       
-                       return ret.iterator();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
index 8a1c325..97f112c 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
@@ -19,8 +19,8 @@
 
 package org.apache.sysml.runtime.instructions.spark;
 
-import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
@@ -138,8 +138,8 @@ public class MatrixReshapeSPInstruction extends 
UnarySPInstruction
                        IndexedMatrixValue in = 
SparkUtils.toIndexedMatrixBlock(arg0);
                        
                        //execute actual reshape operation
-                       ArrayList<IndexedMatrixValue> out = LibMatrixReorg
-                               .reshape(in, _mcIn, new ArrayList<>(), _mcOut, 
_byrow, _outputEmptyBlocks);
+                       List<IndexedMatrixValue> out = LibMatrixReorg
+                               .reshape(in, _mcIn, _mcOut, _byrow, 
_outputEmptyBlocks);
 
                        //output conversion (for compatibility w/ rdd schema)
                        return 
SparkUtils.fromIndexedMatrixBlock(out).iterator();

http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
index 2e7ac11..1b6435b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
@@ -193,8 +193,8 @@ public class PMapmmSPInstruction extends 
BinarySPInstruction {
                                MatrixBlock left = pm.getBlock(i, 
(int)ixIn.getRowIndex());
                        
                                //execute matrix-vector mult
-                               
OperationsOnMatrixValues.performAggregateBinary( 
-                                               new 
MatrixIndexes(i,ixIn.getRowIndex()), left, ixIn, blkIn, ixOut, blkOut, _op);
+                               OperationsOnMatrixValues.matMult(new 
MatrixIndexes(i,ixIn.getRowIndex()),
+                                       left, ixIn, blkIn, ixOut, blkOut, _op);
                                
                                //output new tuple
                                ixOut.setIndexes(_offset+i, 
ixOut.getColumnIndex());

http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
index 294c142..90e5396 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
@@ -191,8 +191,7 @@ public class RmmSPInstruction extends BinarySPInstruction {
                        MatrixBlock blkIn2 = arg0._2()._2();
                        
                        //core block matrix multiplication 
-                       MatrixBlock blkOut = OperationsOnMatrixValues
-                               .performAggregateBinaryIgnoreIndexes(blkIn1, 
blkIn2, new MatrixBlock(), _op);
+                       MatrixBlock blkOut = 
OperationsOnMatrixValues.matMult(blkIn1, blkIn2, new MatrixBlock(), _op);
                        
                        //output new tuple
                        return new Tuple2<>(ixOut, blkOut);

http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
index cabc2c8..5bb686b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
@@ -154,7 +154,7 @@ public class Tsmm2SPInstruction extends UnarySPInstruction {
                                                
(int)(_type.isLeft()?1:ixin.getColumnIndex()));
                                MatrixBlock mbin2t = transpose(mbin2, new 
MatrixBlock()); //prep for transpose rewrite mm
                                
-                               MatrixBlock out2 = (MatrixBlock) 
OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes( //mm
+                               MatrixBlock out2 = (MatrixBlock) 
OperationsOnMatrixValues.matMult( //mm
                                                _type.isLeft() ? mbin2t : mbin, 
_type.isLeft() ? mbin : mbin2t, new MatrixBlock(), _op);
                                MatrixIndexes ixout2 = _type.isLeft() ? new 
MatrixIndexes(2,1) : new MatrixIndexes(1,2);
                                ret.add(new Tuple2<>(ixout2, out2));
@@ -215,7 +215,7 @@ public class Tsmm2SPInstruction extends UnarySPInstruction {
                                                
(int)(_type.isLeft()?1:ixin.getColumnIndex()));
                                MatrixBlock mbin2t = transpose(mbin2, new 
MatrixBlock()); //prep for transpose rewrite mm
                                
-                               MatrixBlock out2 = 
OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes( //mm
+                               MatrixBlock out2 = 
OperationsOnMatrixValues.matMult( //mm
                                                _type.isLeft() ? mbin2t : mbin, 
_type.isLeft() ? mbin : mbin2t, new MatrixBlock(), _op);
                                
                                MatrixIndexes ixout2 = _type.isLeft() ? new 
MatrixIndexes(2,1) : new MatrixIndexes(1,2);

http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java
index 4f168c1..42313fa 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java
@@ -123,10 +123,9 @@ public class ZipmmSPInstruction extends 
BinarySPInstruction {
                        
                        //transpose right input (for vectors no-op)
                        MatrixBlock tmp = 
(MatrixBlock)in2.reorgOperations(_rop, new MatrixBlock(), 0, 0, 0);
-                               
+                       
                        //core matrix multiplication (for t(y)%*%X or t(X)%*%y)
-                       return OperationsOnMatrixValues
-                               .performAggregateBinaryIgnoreIndexes(tmp, in1, 
new MatrixBlock(), _abop);
+                       return OperationsOnMatrixValues.matMult(tmp, in1, new 
MatrixBlock(), _abop);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
index 8de6f13..78b730c 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
@@ -28,10 +28,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map.Entry;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
@@ -69,9 +70,6 @@ public class LibMatrixReorg
        //safe due to copy-on-write and safe update-in-place handling)
        public static final boolean SHALLOW_COPY_REORG = true;
        
-       //allow reuse of temporary blocks for certain operations
-       public static final boolean ALLOW_BLOCK_REUSE = false;
-       
        //use csr instead of mcsr sparse block for rexpand columns / diag v2m
        public static final boolean SPARSE_OUTPUTS_IN_CSR = true;
        
@@ -465,15 +463,15 @@ public class LibMatrixReorg
         * @param outputEmptyBlocks output blocks with nnz=0
         * @return list of indexed matrix values
         */
-       public static ArrayList<IndexedMatrixValue> reshape( IndexedMatrixValue 
in, MatrixCharacteristics mcIn, 
-                       ArrayList<IndexedMatrixValue> out, 
MatrixCharacteristics mcOut, boolean rowwise, boolean outputEmptyBlocks ) {
+       public static List<IndexedMatrixValue> reshape(IndexedMatrixValue in, 
MatrixCharacteristics mcIn,
+                       MatrixCharacteristics mcOut, boolean rowwise, boolean 
outputEmptyBlocks ) {
                //prepare inputs
                MatrixIndexes ixIn = in.getIndexes();
                MatrixBlock mbIn = (MatrixBlock) in.getValue();
                
                //prepare result blocks (no reuse in order to guarantee mem 
constraints)
                Collection<MatrixIndexes> rix = 
computeAllResultBlockIndexes(ixIn, mcIn, mcOut, mbIn, rowwise, 
outputEmptyBlocks);
-               HashMap<MatrixIndexes, MatrixBlock> rblk = 
createAllResultBlocks(rix, mbIn.nonZeros, mcIn, mcOut, rowwise, out);
+               Map<MatrixIndexes, MatrixBlock> rblk = 
createAllResultBlocks(rix, mbIn.nonZeros, mcOut);
                
                //basic algorithm
                long row_offset = (ixIn.getRowIndex()-1)*mcIn.getRowsPerBlock();
@@ -483,15 +481,11 @@ public class LibMatrixReorg
                else //dense
                        reshapeDense(mbIn, row_offset, col_offset, rblk, mcIn, 
mcOut, rowwise);
                
-               //prepare output
-               out = new ArrayList<>();
-               for( Entry<MatrixIndexes, MatrixBlock> e : rblk.entrySet() )
-                       if( outputEmptyBlocks || 
!e.getValue().isEmptyBlock(false) ) {
-                               e.getValue().examSparsity(); //ensure correct 
format
-                               out.add(new 
IndexedMatrixValue(e.getKey(),e.getValue()));
-                       }
-               
-               return out;
+               //prepare output (sparsity switch, wrapper)
+               return rblk.entrySet().stream()
+                       .filter( e -> outputEmptyBlocks || 
!e.getValue().isEmptyBlock(false))
+                       .map(e -> {e.getValue().examSparsity(); return new 
IndexedMatrixValue(e.getKey(),e.getValue());})
+                       .collect(Collectors.toList());
        }
 
        /**
@@ -1552,46 +1546,26 @@ public class LibMatrixReorg
                                row_offset+cell.getI(), col_offset+cell.getJ(), 
mcIn, mcOut, rowwise));
                }
        }
-
-       @SuppressWarnings("unused")
-       private static HashMap<MatrixIndexes, MatrixBlock> 
createAllResultBlocks( Collection<MatrixIndexes> rix, 
-                       long nnz, MatrixCharacteristics mcIn, 
MatrixCharacteristics mcOut,
-                       boolean rowwise, ArrayList<IndexedMatrixValue> reuse )
-       {
-               HashMap<MatrixIndexes, MatrixBlock> ret = new 
HashMap<MatrixIndexes,MatrixBlock>();
-               long nBlocks = rix.size();
-               int count = 0;
-               
-               for( MatrixIndexes ix : rix )
-               {
-                       //compute indexes
-                       long bi = ix.getRowIndex();
-                       long bj = ix.getColumnIndex();
-                       int lbrlen = 
UtilFunctions.computeBlockSize(mcOut.getRows(), bi, mcOut.getRowsPerBlock());
-                       int lbclen = 
UtilFunctions.computeBlockSize(mcOut.getCols(), bj, mcOut.getColsPerBlock());
-                       
-                       //create result block
-                       int estnnz = (int) (nnz/nBlocks); //force 
initialcapacity per row to 1, for many blocks
-                       boolean sparse = 
MatrixBlock.evalSparseFormatInMemory(lbrlen, lbclen, estnnz);
-                       MatrixBlock block = null;
-                       if( ALLOW_BLOCK_REUSE && reuse!=null && 
!reuse.isEmpty()) {
-                               block = (MatrixBlock) 
reuse.get(count++).getValue();
-                               block.reset(lbrlen, lbclen, sparse, estnnz);
-                       }
-                       else
-                               block = new MatrixBlock(lbrlen, lbclen, sparse, 
estnnz); 
-                       
-                       if( lbrlen<1 || lbclen<1 )
-                               throw new RuntimeException("Computed block 
dimensions ("+bi+","+bj+" -> "+lbrlen+","+lbclen+") are invalid!");
-                       
-                       ret.put(ix, block);
-               }
-               
-               return ret;
+       
+       private static Map<MatrixIndexes, MatrixBlock> 
createAllResultBlocks(Collection<MatrixIndexes> rix, long nnz, 
MatrixCharacteristics mcOut) {
+               return rix.stream().collect(Collectors.toMap(ix -> ix, ix -> 
createResultBlock(ix, nnz, rix.size(), mcOut)));
+       }
+       
+       private static MatrixBlock createResultBlock(MatrixIndexes ix, long 
nnz, int nBlocks, MatrixCharacteristics mcOut) {
+               //compute indexes
+               long bi = ix.getRowIndex();
+               long bj = ix.getColumnIndex();
+               int lbrlen = UtilFunctions.computeBlockSize(mcOut.getRows(), 
bi, mcOut.getRowsPerBlock());
+               int lbclen = UtilFunctions.computeBlockSize(mcOut.getCols(), 
bj, mcOut.getColsPerBlock());
+               if( lbrlen<1 || lbclen<1 )
+                       throw new DMLRuntimeException("Computed block 
dimensions ("+bi+","+bj+" -> "+lbrlen+","+lbclen+") are invalid!");
+               //create result block
+               int estnnz = (int) (nnz/nBlocks); //force initial capacity per 
row to 1, for many blocks
+               boolean sparse = MatrixBlock.evalSparseFormatInMemory(lbrlen, 
lbclen, estnnz);
+               return new MatrixBlock(lbrlen, lbclen, sparse, estnnz); 
        }
 
-       private static void reshapeDense( MatrixBlock in, long row_offset, long 
col_offset, 
-                       HashMap<MatrixIndexes,MatrixBlock> rix,
+       private static void reshapeDense( MatrixBlock in, long row_offset, long 
col_offset, Map<MatrixIndexes,MatrixBlock> rix,
                        MatrixCharacteristics mcIn, MatrixCharacteristics 
mcOut, boolean rowwise ) {
                if( in.isEmptyBlock(false) )
                        return;
@@ -1622,14 +1596,12 @@ public class LibMatrixReorg
                
                //cleanup for sparse blocks
                if( !rowwise && mcIn.getRows() > 1 ) {
-                       for( MatrixBlock block : rix.values() )
-                               if( block.sparse )
-                                       block.sortSparseRows();
+                       rix.values().stream().filter(b -> b.sparse)
+                               .forEach(b -> b.sortSparseRows());
                }
        }
 
-       private static void reshapeSparse( MatrixBlock in, long row_offset, 
long col_offset, 
-                       HashMap<MatrixIndexes,MatrixBlock> rix, 
+       private static void reshapeSparse( MatrixBlock in, long row_offset, 
long col_offset, Map<MatrixIndexes,MatrixBlock> rix, 
                        MatrixCharacteristics mcIn, MatrixCharacteristics 
mcOut, boolean rowwise ) {
                if( in.isEmptyBlock(false) )
                        return;
@@ -1639,35 +1611,36 @@ public class LibMatrixReorg
                
                //append all values to right blocks
                MatrixIndexes ixtmp = new MatrixIndexes();
-               for( int i=0; i<rlen; i++ )
-               {
-                       if( !a.isEmpty(i) ) {
-                               long ai = row_offset+i;
-                               int apos = a.pos(i);
-                               int alen = a.size(i);
-                               int[] aix = a.indexes(i);
-                               double[] avals = a.values(i);
-                               
-                               for( int j=apos; j<apos+alen; j++ )  {
-                                       long aj = col_offset+aix[j];
-                                       ixtmp = computeResultBlockIndex(ixtmp, 
ai, aj, mcIn, mcOut, rowwise);
-                                       MatrixBlock out = rix.get(ixtmp);
-                                       if( out == null )
-                                               throw new 
DMLRuntimeException("Missing result block: "+ixtmp);
-                                       ixtmp = computeInBlockIndex(ixtmp, ai, 
aj, mcIn, mcOut, rowwise);
-                                       
out.appendValue((int)ixtmp.getRowIndex(),(int)ixtmp.getColumnIndex(), avals[j]);
-                               }
+               for( int i=0; i<rlen; i++ ) {
+                       if( a.isEmpty(i) ) continue;
+                       long ai = row_offset+i;
+                       int apos = a.pos(i);
+                       int alen = a.size(i);
+                       int[] aix = a.indexes(i);
+                       double[] avals = a.values(i);
+                       for( int j=apos; j<apos+alen; j++ )  {
+                               long aj = col_offset+aix[j];
+                               ixtmp = computeResultBlockIndex(ixtmp, ai, aj, 
mcIn, mcOut, rowwise);
+                               MatrixBlock out = getAllocatedBlock(rix, ixtmp);
+                               ixtmp = computeInBlockIndex(ixtmp, ai, aj, 
mcIn, mcOut, rowwise);
+                               
out.appendValue((int)ixtmp.getRowIndex(),(int)ixtmp.getColumnIndex(), avals[j]);
                        }
                }
                
                //cleanup for sparse blocks
                if( !rowwise && mcIn.getRows() > 1 ) {
-                       for( MatrixBlock block : rix.values() )
-                               if( block.sparse )
-                                       block.sortSparseRows();
+                       rix.values().stream().filter(b -> b.sparse)
+                               .forEach(b -> b.sortSparseRows());
                }
        }
        
+       private static MatrixBlock 
getAllocatedBlock(Map<MatrixIndexes,MatrixBlock> rix, MatrixIndexes ix) {
+               MatrixBlock out = rix.get(ix);
+               if( out == null )
+                       throw new DMLRuntimeException("Missing result block: 
"+ix);
+               return out;
+       }
+       
        /**
         * Assumes internal (0-begin) indices ai, aj as input; computes 
external block indexes (1-begin) 
         * 
@@ -1682,25 +1655,27 @@ public class LibMatrixReorg
        private static MatrixIndexes computeResultBlockIndex( MatrixIndexes 
ixout, long ai, long aj,
                MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, 
boolean rowwise )
        {
-               long tempc = rowwise ? ai*mcIn.getCols()+aj : 
ai+mcIn.getRows()*aj;
+               long tempc = computeGlobalCellIndex(mcIn, ai, aj, rowwise);
                long ci = rowwise ? tempc/mcOut.getCols() : 
tempc%mcOut.getRows();
                long cj = rowwise ? tempc%mcOut.getCols() : 
tempc/mcOut.getRows();
                long bci = ci/mcOut.getRowsPerBlock() + 1;
-               long bcj = cj/mcOut.getColsPerBlock() + 1; 
-               return (ixout != null) ? ixout.setIndexes(bci, bcj) :
-                       new MatrixIndexes(bci, bcj);
+               long bcj = cj/mcOut.getColsPerBlock() + 1;
+               return ixout.setIndexes(bci, bcj);
        }
-
+       
        private static MatrixIndexes computeInBlockIndex( MatrixIndexes ixout, 
long ai, long aj,
                MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, 
boolean rowwise )
        {
-               long tempc = rowwise ? ai*mcIn.getCols()+aj : 
ai+mcIn.getRows()*aj;
+               long tempc = computeGlobalCellIndex(mcIn, ai, aj, rowwise);
                long ci = rowwise ? 
(tempc/mcOut.getCols())%mcOut.getRowsPerBlock() : 
                        (tempc%mcOut.getRows())%mcOut.getRowsPerBlock();
                long cj = rowwise ? 
(tempc%mcOut.getCols())%mcOut.getColsPerBlock() : 
                        (tempc/mcOut.getRows())%mcOut.getColsPerBlock();
-               return (ixout != null) ? ixout.setIndexes(ci, cj) :
-                       new MatrixIndexes(ci, cj);
+               return ixout.setIndexes(ci, cj);
+       }
+       
+       private static long computeGlobalCellIndex(MatrixCharacteristics mcIn, 
long ai, long aj, boolean rowwise) {
+               return rowwise ? ai*mcIn.getCols()+aj : ai+mcIn.getRows()*aj;
        }
 
        private static MatrixBlock removeEmptyRows(MatrixBlock in, MatrixBlock 
ret, MatrixBlock select, boolean emptyReturn) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
 
b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
index 1e1c003..0e77b8e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
@@ -218,7 +218,7 @@ public class OperationsOnMatrixValues
                valueIn.aggregateUnaryOperations(op, valueOut, brlen, bclen, 
indexesIn);
        }
        
-       public static MatrixBlock performAggregateBinary(MatrixIndexes 
indexes1, MatrixBlock value1, MatrixIndexes indexes2, MatrixBlock value2, 
+       public static MatrixBlock matMult(MatrixIndexes indexes1, MatrixBlock 
value1, MatrixIndexes indexes2, MatrixBlock value2, 
                        MatrixIndexes indexesOut, MatrixBlock valueOut, 
AggregateBinaryOperator op) {
                //compute output index
                indexesOut.setIndexes(indexes1.getRowIndex(), 
indexes2.getColumnIndex());
@@ -229,7 +229,7 @@ public class OperationsOnMatrixValues
                        return value1.aggregateBinaryOperations(indexes1, 
value1, indexes2, value2, valueOut, op);
        }
 
-       public static MatrixBlock 
performAggregateBinaryIgnoreIndexes(MatrixBlock value1, MatrixBlock value2,
+       public static MatrixBlock matMult(MatrixBlock value1, MatrixBlock 
value2,
                        MatrixBlock valueOut, AggregateBinaryOperator op) {
                //perform on the value
                if( value2 instanceof CompressedMatrixBlock )

http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRReducerWithAggregator.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRReducerWithAggregator.java
 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRReducerWithAggregator.java
index a9e1714..3d11062 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRReducerWithAggregator.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRReducerWithAggregator.java
@@ -123,15 +123,15 @@ public class MMCJMRReducerWithAggregator extends 
MMCJMRCombinerReducerBase
                                        {
                                                //perform matrix multiplication
                                                
indexesbuffer.setIndexes(tmp.getKey().getRowIndex(), inIndex);
-                                               
OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes((MatrixBlock)tmp.getValue(),
-                                                       (MatrixBlock)inValue, 
(MatrixBlock)valueBuffer, 
(AggregateBinaryOperator)aggBinInstruction.getOperator());
+                                               
OperationsOnMatrixValues.matMult((MatrixBlock)tmp.getValue(), 
(MatrixBlock)inValue,
+                                                       
(MatrixBlock)valueBuffer, 
(AggregateBinaryOperator)aggBinInstruction.getOperator());
                                        }
                                        else //right cached
                                        {
                                                //perform matrix multiplication
                                                
indexesbuffer.setIndexes(inIndex, tmp.getKey().getColumnIndex());
-                                               
OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes((MatrixBlock)inValue,
-                                                       
(MatrixBlock)tmp.getValue(), (MatrixBlock)valueBuffer, 
(AggregateBinaryOperator)aggBinInstruction.getOperator());
+                                               
OperationsOnMatrixValues.matMult((MatrixBlock)inValue, 
(MatrixBlock)tmp.getValue(),
+                                                       
(MatrixBlock)valueBuffer, 
(AggregateBinaryOperator)aggBinInstruction.getOperator());
                                        }
                                        
                                        //aggregate block to output buffer or 
direct output

Reply via email to