[SYSTEMML-2262] New multi-threaded dense unary ops (exp, log, sigmoid)

This patch introduces a best effort multi-threading for expensive dense
unary operations such as exp, log, and sigmoid. While for other unary
operations, the output allocation, read and write anyway dominate, these
operations experience a substantial speedup with multi-threading, which
we simply apply via a parallelSetAll.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/4c7640b8
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/4c7640b8
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/4c7640b8

Branch: refs/heads/master
Commit: 4c7640b873dfa921409cbef375c856aa48940932
Parents: 45d86bd
Author: Matthias Boehm <[email protected]>
Authored: Fri Apr 20 01:39:15 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Fri Apr 20 01:39:15 2018 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/hops/OptimizerUtils.java   | 16 ++++-----
 .../java/org/apache/sysml/hops/UnaryOp.java     | 36 ++++++++++++--------
 src/main/java/org/apache/sysml/lops/Unary.java  |  9 +++--
 .../parfor/opt/OptimizerRuleBased.java          |  3 +-
 .../instructions/CPInstructionParser.java       |  4 ++-
 .../instructions/cp/UnaryCPInstruction.java     |  2 +-
 .../sysml/runtime/matrix/data/MatrixBlock.java  | 17 ++++++---
 7 files changed, 54 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/4c7640b8/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index 5ae20eb..2d76759 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -414,30 +414,30 @@ public class OptimizerUtils
         * 
         * @return local memory budget
         */
-       public static double getLocalMemBudget()
-       {
+       public static double getLocalMemBudget() {
                double ret = InfrastructureAnalyzer.getLocalMaxMemory();
                return ret * OptimizerUtils.MEM_UTIL_FACTOR;
        }
        
-       public static double getRemoteMemBudgetMap()
-       {
+       public static double getRemoteMemBudgetMap() {
                return getRemoteMemBudgetMap(false);
        }
        
-       public static double getRemoteMemBudgetMap(boolean substractSortBuffer)
-       {
+       public static double getRemoteMemBudgetMap(boolean substractSortBuffer) 
{
                double ret = InfrastructureAnalyzer.getRemoteMaxMemoryMap();
                if( substractSortBuffer )
                        ret -= 
InfrastructureAnalyzer.getRemoteMaxMemorySortBuffer();
                return ret * OptimizerUtils.MEM_UTIL_FACTOR;
        }
 
-       public static double getRemoteMemBudgetReduce()
-       {
+       public static double getRemoteMemBudgetReduce() {
                double ret = InfrastructureAnalyzer.getRemoteMaxMemoryReduce();
                return ret * OptimizerUtils.MEM_UTIL_FACTOR;
        }
+       
+       public static boolean isMaxLocalParallelism(int k) {
+               return InfrastructureAnalyzer.getLocalParallelism() == k;
+       }
 
        public static boolean checkSparkBroadcastMemoryBudget( double size )
        {

http://git-wip-us.apache.org/repos/asf/systemml/blob/4c7640b8/src/main/java/org/apache/sysml/hops/UnaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/UnaryOp.java 
b/src/main/java/org/apache/sysml/hops/UnaryOp.java
index a6a9d66..7b7c685 100644
--- a/src/main/java/org/apache/sysml/hops/UnaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/UnaryOp.java
@@ -177,9 +177,10 @@ public class UnaryOp extends Hop implements 
MultiThreadedHop
                                }
                                else //default unary 
                                {
-                                       int k = isCumulativeUnaryOperation() ? 
OptimizerUtils.getConstrainedNumThreads( _maxNumThreads ) : 1;
-                                       Unary unary1 = new 
Unary(input.constructLops(), HopsOpOp1LopsU.get(_op), 
-                                                                        
getDataType(), getValueType(), et, k);
+                                       int k = isCumulativeUnaryOperation() || 
isExpensiveUnaryOperation() ?
+                                               
OptimizerUtils.getConstrainedNumThreads( _maxNumThreads ) : 1;
+                                       Unary unary1 = new 
Unary(input.constructLops(),
+                                               HopsOpOp1LopsU.get(_op), 
getDataType(), getValueType(), et, k);
                                        setOutputDimensions(unary1);
                                        setLineNumbers(unary1);
                                        setLops(unary1);
@@ -612,21 +613,26 @@ public class UnaryOp extends Hop implements 
MultiThreadedHop
                return ( _op == OpOp1.INVERSE );
        }
 
-       public boolean isCumulativeUnaryOperation() 
-       {
-               return (   _op == OpOp1.CUMSUM 
-                               || _op == OpOp1.CUMPROD
-                               || _op == OpOp1.CUMMIN
-                               || _op == OpOp1.CUMMAX  );
+       public boolean isCumulativeUnaryOperation()  {
+               return (_op == OpOp1.CUMSUM 
+                       || _op == OpOp1.CUMPROD
+                       || _op == OpOp1.CUMMIN
+                       || _op == OpOp1.CUMMAX);
        }
 
        public boolean isCastUnaryOperation() {
-               return (   _op == OpOp1.CAST_AS_MATRIX
-                               || _op == OpOp1.CAST_AS_SCALAR
-                               || _op == OpOp1.CAST_AS_FRAME
-                               || _op == OpOp1.CAST_AS_BOOLEAN
-                               || _op == OpOp1.CAST_AS_DOUBLE
-                               || _op == OpOp1.CAST_AS_INT    );
+               return (_op == OpOp1.CAST_AS_MATRIX
+                       || _op == OpOp1.CAST_AS_SCALAR
+                       || _op == OpOp1.CAST_AS_FRAME
+                       || _op == OpOp1.CAST_AS_BOOLEAN
+                       || _op == OpOp1.CAST_AS_DOUBLE
+                       || _op == OpOp1.CAST_AS_INT);
+       }
+       
+       public boolean isExpensiveUnaryOperation()  {
+               return (_op == OpOp1.EXP 
+                       || _op == OpOp1.LOG
+                       || _op == OpOp1.SIGMOID);
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/systemml/blob/4c7640b8/src/main/java/org/apache/sysml/lops/Unary.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/Unary.java 
b/src/main/java/org/apache/sysml/lops/Unary.java
index 7c1ceeb..a5403b2 100644
--- a/src/main/java/org/apache/sysml/lops/Unary.java
+++ b/src/main/java/org/apache/sysml/lops/Unary.java
@@ -325,11 +325,14 @@ public class Unary extends Lop
                }
        }
        
-       public static boolean isCumulativeOp(OperationTypes op) {
+       public static boolean isMultiThreadedOp(OperationTypes op) {
                return op==OperationTypes.CUMSUM
                        || op==OperationTypes.CUMPROD
                        || op==OperationTypes.CUMMIN
-                       || op==OperationTypes.CUMMAX;
+                       || op==OperationTypes.CUMMAX
+                       || op==OperationTypes.EXP
+                       || op==OperationTypes.LOG
+                       || op==OperationTypes.SIGMOID;
        }
        
        @Override
@@ -351,7 +354,7 @@ public class Unary extends Lop
                sb.append( prepOutputOperand(output) );
                
                //num threads for cumulative cp ops
-               if( getExecType() == ExecType.CP && isCumulativeOp(operation) ) 
{
+               if( getExecType() == ExecType.CP && 
isMultiThreadedOp(operation) ) {
                        sb.append( OPERAND_DELIMITOR );
                        sb.append( _numThreads );
                }

http://git-wip-us.apache.org/repos/asf/systemml/blob/4c7640b8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index e13f2a7..a7f5834 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -1317,7 +1317,8 @@ public class OptimizerRuleBased extends Optimizer
                                                         && 
!HopRewriteUtils.isValidOp(((ParameterizedBuiltinOp)h).getOp(), 
                                                                
ParamBuiltinOp.GROUPEDAGG, ParamBuiltinOp.REXPAND))
                                                && !( h instanceof UnaryOp 
//only unaryop-cumulativeagg
-                                                        && 
!((UnaryOp)h).isCumulativeUnaryOperation() )
+                                                        && 
!((UnaryOp)h).isCumulativeUnaryOperation()
+                                                        && 
!((UnaryOp)h).isExpensiveUnaryOperation())
                                                && !( h instanceof ReorgOp 
//only reorgop-transpose
                                                         && 
((ReorgOp)h).getOp() != ReOrgOp.TRANS ))
                                        {

http://git-wip-us.apache.org/repos/asf/systemml/blob/4c7640b8/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java 
b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
index 9dbb24e..395a4ec 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
@@ -64,6 +64,7 @@ import 
org.apache.sysml.runtime.instructions.cp.UaggOuterChainCPInstruction;
 import org.apache.sysml.runtime.instructions.cp.UnaryCPInstruction;
 import org.apache.sysml.runtime.instructions.cp.VariableCPInstruction;
 import 
org.apache.sysml.runtime.instructions.cpfile.MatrixIndexingCPFileInstruction;
+import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class CPInstructionParser extends InstructionParser 
 {
@@ -385,7 +386,8 @@ public class CPInstructionParser extends InstructionParser
                        case Builtin: 
                                String []parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
                                if ( parts[0].equals("log") || 
parts[0].equals("log_nz") ) {
-                                       if ( parts.length == 3 ) {
+                                       if ( parts.length == 3 || (parts.length 
== 4 &&
+                                               
UtilFunctions.isIntegerNumber(parts[3])) ) {
                                                // B=log(A), y=log(x)
                                                return 
UnaryCPInstruction.parseInstruction(str);
                                        } else if ( parts.length == 4 ) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/4c7640b8/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java
index 07be2a7..8e023ea 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java
@@ -62,7 +62,7 @@ public abstract class UnaryCPInstruction extends 
ComputationCPInstruction {
                        out.split(parts[2]);
                        func = Builtin.getBuiltinFnObject(opcode);
                        
-                       if( Arrays.asList(new 
String[]{"ucumk+","ucum*","ucummin","ucummax"}).contains(opcode) )
+                       if( Arrays.asList(new 
String[]{"ucumk+","ucum*","ucummin","ucummax","exp","log","sigmoid"}).contains(opcode)
 )
                                return new UnaryMatrixCPInstruction(new 
UnaryOperator(func,Integer.parseInt(parts[3])), in, out, opcode, str); 
                        else
                                return new UnaryScalarCPInstruction(null, in, 
out, opcode, str);

http://git-wip-us.apache.org/repos/asf/systemml/blob/4c7640b8/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index f738227..bb5e79b 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -2577,16 +2577,25 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                        ret.reset(rlen, clen, sp);
                
                //core execute
-               if( LibMatrixAgg.isSupportedUnaryOperator(op) ) 
-               {
+               if( LibMatrixAgg.isSupportedUnaryOperator(op) ) {
                        //e.g., cumsum/cumprod/cummin/cumax
                        if( op.getNumThreads() > 1 )
                                LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, 
op, op.getNumThreads());
                        else
                                LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, 
op);
                }
-               else
-               {
+               else if(!sparse && !isEmptyBlock(false) && 
getDenseBlock().isContiguous()
+                       && 
OptimizerUtils.isMaxLocalParallelism(op.getNumThreads())) {
+                       //note: we apply multi-threading in a best-effort 
manner here
+                       //only for expensive operators such as exp, log, 
sigmoid, because
+                       //otherwise allocation, read and write anyway dominates
+                       ret.allocateDenseBlock(false);
+                       double[] a = getDenseBlockValues();
+                       double[] c = ret.getDenseBlockValues();
+                       Arrays.parallelSetAll(c, i -> op.fn.execute(a[i]));
+                       ret.recomputeNonZeros();
+               }
+               else {
                        //default execute unary operations
                        if(op.sparseSafe)
                                sparseUnaryOperations(op, ret);

Reply via email to