[SYSTEMML-2504] In-place CP cumulative aggregates, incl compiler

This patch adds an option for in-place CP cumulative aggregates because
result allocation is the major bottleneck. As an initial compiler
integration, we now compiler inplace CP operations for the aggregation
of partial aggregates in Spark cumsum because it guarantees validity.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/25a10f41
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/25a10f41
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/25a10f41

Branch: refs/heads/master
Commit: 25a10f412614235d8974f371a2bb07bc08c88cee
Parents: 21b1a53
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Wed Dec 5 20:38:37 2018 +0100
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Wed Dec 5 20:38:37 2018 +0100

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/UnaryOp.java     | 10 +++++-----
 src/main/java/org/apache/sysml/lops/Unary.java  |  7 +++++--
 .../instructions/cp/UnaryCPInstruction.java     |  5 +++--
 .../sysml/runtime/matrix/data/LibMatrixAgg.java | 20 +++++++++++++++-----
 .../runtime/matrix/operators/UnaryOperator.java | 10 ++++++++--
 5 files changed, 36 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/25a10f41/src/main/java/org/apache/sysml/hops/UnaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/UnaryOp.java 
b/src/main/java/org/apache/sysml/hops/UnaryOp.java
index d1110c3..4071d6f 100644
--- a/src/main/java/org/apache/sysml/hops/UnaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/UnaryOp.java
@@ -170,7 +170,7 @@ public class UnaryOp extends MultiThreadedHop
                                        int k = isCumulativeUnaryOperation() || 
isExpensiveUnaryOperation() ?
                                                
OptimizerUtils.getConstrainedNumThreads( _maxNumThreads ) : 1;
                                        Unary unary1 = new 
Unary(input.constructLops(),
-                                               HopsOpOp1LopsU.get(_op), 
getDataType(), getValueType(), et, k);
+                                               HopsOpOp1LopsU.get(_op), 
getDataType(), getValueType(), et, k, false);
                                        setOutputDimensions(unary1);
                                        setLineNumbers(unary1);
                                        setLops(unary1);
@@ -404,15 +404,15 @@ public class UnaryOp extends MultiThreadedHop
                        agg.getOutputParameters().setDimensions(rlenAgg, clen, 
brlen, bclen, -1);
                        
agg.setupCorrectionLocation(CorrectionLocationType.NONE); // aggregation uses 
kahanSum but the inputs do not have correction values
                        setLineNumbers(agg);
-                       TEMP = agg;     
+                       TEMP = agg;
                        level++;
                        force = false; //in case of unknowns, generate one level
                }
                
                //in-memory cum sum (of partial aggregates)
                if( TEMP.getOutputParameters().getNumRows()!=1 ) {
-                       int k = OptimizerUtils.getConstrainedNumThreads( 
_maxNumThreads );                                      
-                       Unary unary1 = new Unary( TEMP, 
HopsOpOp1LopsU.get(_op), DataType.MATRIX, ValueType.DOUBLE, ExecType.CP, k);
+                       int k = OptimizerUtils.getConstrainedNumThreads( 
_maxNumThreads );
+                       Unary unary1 = new Unary( TEMP, 
HopsOpOp1LopsU.get(_op), DataType.MATRIX, ValueType.DOUBLE, ExecType.CP, k, 
true);
                        
unary1.getOutputParameters().setDimensions(TEMP.getOutputParameters().getNumRows(),
 clen, brlen, bclen, -1);
                        setLineNumbers(unary1);
                        TEMP = unary1;
@@ -487,7 +487,7 @@ public class UnaryOp extends MultiThreadedHop
                //in-memory cum sum (of partial aggregates)
                if( TEMP.getOutputParameters().getNumRows()!=1 ){
                        int k = OptimizerUtils.getConstrainedNumThreads( 
_maxNumThreads );
-                       Unary unary1 = new Unary( TEMP, 
HopsOpOp1LopsU.get(_op), DataType.MATRIX, ValueType.DOUBLE, ExecType.CP, k);
+                       Unary unary1 = new Unary( TEMP, 
HopsOpOp1LopsU.get(_op), DataType.MATRIX, ValueType.DOUBLE, ExecType.CP, k, 
true);
                        
unary1.getOutputParameters().setDimensions(TEMP.getOutputParameters().getNumRows(),
 clen, brlen, bclen, -1);
                        setLineNumbers(unary1);
                        TEMP = unary1;

http://git-wip-us.apache.org/repos/asf/systemml/blob/25a10f41/src/main/java/org/apache/sysml/lops/Unary.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/Unary.java 
b/src/main/java/org/apache/sysml/lops/Unary.java
index c6f3151..f299603 100644
--- a/src/main/java/org/apache/sysml/lops/Unary.java
+++ b/src/main/java/org/apache/sysml/lops/Unary.java
@@ -53,7 +53,7 @@ public class Unary extends Lop
        
        //cp-specific parameters
        private int _numThreads = 1;
-
+       private boolean _inplace = false;
 
        /**
         * Constructor to perform a unary operation with 2 inputs
@@ -114,10 +114,11 @@ public class Unary extends Lop
         * @param et execution type
         * @param numThreads number of threads
         */
-       public Unary(Lop input1, OperationTypes op, DataType dt, ValueType vt, 
ExecType et, int numThreads) {
+       public Unary(Lop input1, OperationTypes op, DataType dt, ValueType vt, 
ExecType et, int numThreads, boolean inplace) {
                super(Lop.Type.UNARY, dt, vt);
                init(input1, op, dt, vt, et);
                _numThreads = numThreads;
+               _inplace = inplace;
        }
 
        private void init(Lop input1, OperationTypes op, DataType dt, ValueType 
vt, ExecType et) {
@@ -361,6 +362,8 @@ public class Unary extends Lop
                if( getExecType() == ExecType.CP && 
isMultiThreadedOp(operation) ) {
                        sb.append( OPERAND_DELIMITOR );
                        sb.append( _numThreads );
+                       sb.append( OPERAND_DELIMITOR );
+                       sb.append( _inplace );
                }
                
                return sb.toString();

http://git-wip-us.apache.org/repos/asf/systemml/blob/25a10f41/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java
index 9f5d71e..dcf9647 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java
@@ -56,14 +56,15 @@ public abstract class UnaryCPInstruction extends 
ComputationCPInstruction {
                ValueFunction func = null;
                
                //print or stop or cumulative aggregates
-               if( parts.length==4 ) {
+               if( parts.length==5 ) {
                        opcode = parts[0];
                        in.split(parts[1]);
                        out.split(parts[2]);
                        func = Builtin.getBuiltinFnObject(opcode);
                        
                        if( Arrays.asList(new 
String[]{"ucumk+","ucum*","ucumk+*","ucummin","ucummax","exp","log","sigmoid"}).contains(opcode)
 )
-                               return new UnaryMatrixCPInstruction(new 
UnaryOperator(func,Integer.parseInt(parts[3])), in, out, opcode, str); 
+                               return new UnaryMatrixCPInstruction(new 
UnaryOperator(func,
+                                       
Integer.parseInt(parts[3]),Boolean.parseBoolean(parts[4])), in, out, opcode, 
str);
                        else
                                return new UnaryScalarCPInstruction(null, in, 
out, opcode, str);
                }

http://git-wip-us.apache.org/repos/asf/systemml/blob/25a10f41/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
index c817a26..5e785d9 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
@@ -299,8 +299,13 @@ public class LibMatrixAgg
                }
                
                //allocate output arrays (if required)
-               out.reset(m2, n2, false); //always dense
-               out.allocateDenseBlock();
+               if( !uop.isInplace() || in.isInSparseFormat() ) {
+                       out.reset(m2, n2, false); //always dense
+                       out.allocateDenseBlock();
+               }
+               else {
+                       out = in;
+               }
                
                //Timing time = new Timing(true);
                
@@ -337,13 +342,18 @@ public class LibMatrixAgg
                //filter empty input blocks (incl special handling for 
sparse-unsafe operations)
                if( in.isEmptyBlock(false) ){
                        return aggregateUnaryMatrixEmpty(in, out, aggtype, 
null);
-               }       
+               }
 
                //Timing time = new Timing(true);
                
                //allocate output arrays (if required)
-               out.reset(m2, n2, false); //always dense
-               out.allocateDenseBlock();
+               if( !uop.isInplace() || in.isInSparseFormat() ) {
+                       out.reset(m2, n2, false); //always dense
+                       out.allocateDenseBlock();
+               }
+               else {
+                       out = in;
+               }
                
                //core multi-threaded unary aggregate computation
                //(currently: always parallelization over number of rows)

http://git-wip-us.apache.org/repos/asf/systemml/blob/25a10f41/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java 
b/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java
index 8b3888e..b24ccf9 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java
@@ -29,12 +29,13 @@ public class UnaryOperator extends Operator
 
        public final ValueFunction fn;
        private final int k; //num threads
+       private final boolean inplace;
 
        public UnaryOperator(ValueFunction p) {
-               this(p, 1); //default single-threaded
+               this(p, 1, false); //default single-threaded
        }
        
-       public UnaryOperator(ValueFunction p, int numThreads) {
+       public UnaryOperator(ValueFunction p, int numThreads, boolean inPlace) {
                super(p instanceof Builtin &&
                        (((Builtin)p).bFunc==Builtin.BuiltinCode.SIN || 
((Builtin)p).bFunc==Builtin.BuiltinCode.TAN 
                        // sinh and tanh are zero only at zero, else they are 
nnz
@@ -44,9 +45,14 @@ public class UnaryOperator extends Operator
                        || ((Builtin)p).bFunc==Builtin.BuiltinCode.LOG_NZ || 
((Builtin)p).bFunc==Builtin.BuiltinCode.SIGN) );
                fn = p;
                k = numThreads;
+               inplace = inPlace;
        }
        
        public int getNumThreads() {
                return k;
        }
+       
+       public boolean isInplace() {
+               return inplace;
+       }
 }

Reply via email to