[SYSTEMML-2262] New multi-threaded dense unary ops (exp, log, sigmoid) This patch introduces a best effort multi-threading for expensive dense unary operations such as exp, log, and sigmoid. While for other unary operations, the output allocation, read and write anyway dominate, these operations experience a substantial speedup with multi-threading, which we simply apply via a parallelSetAll.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/4c7640b8 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/4c7640b8 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/4c7640b8 Branch: refs/heads/master Commit: 4c7640b873dfa921409cbef375c856aa48940932 Parents: 45d86bd Author: Matthias Boehm <[email protected]> Authored: Fri Apr 20 01:39:15 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Apr 20 01:39:15 2018 -0700 ---------------------------------------------------------------------- .../org/apache/sysml/hops/OptimizerUtils.java | 16 ++++----- .../java/org/apache/sysml/hops/UnaryOp.java | 36 ++++++++++++-------- src/main/java/org/apache/sysml/lops/Unary.java | 9 +++-- .../parfor/opt/OptimizerRuleBased.java | 3 +- .../instructions/CPInstructionParser.java | 4 ++- .../instructions/cp/UnaryCPInstruction.java | 2 +- .../sysml/runtime/matrix/data/MatrixBlock.java | 17 ++++++--- 7 files changed, 54 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/4c7640b8/src/main/java/org/apache/sysml/hops/OptimizerUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java index 5ae20eb..2d76759 100644 --- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java +++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java @@ -414,30 +414,30 @@ public class OptimizerUtils * * @return local memory budget */ - public static double getLocalMemBudget() - { + public static double getLocalMemBudget() { double ret = InfrastructureAnalyzer.getLocalMaxMemory(); return ret * OptimizerUtils.MEM_UTIL_FACTOR; } - public static double getRemoteMemBudgetMap() - { + public static double getRemoteMemBudgetMap() { return getRemoteMemBudgetMap(false); } - public static double getRemoteMemBudgetMap(boolean substractSortBuffer) - { + public static double getRemoteMemBudgetMap(boolean substractSortBuffer) { double ret = InfrastructureAnalyzer.getRemoteMaxMemoryMap(); if( substractSortBuffer ) ret -= InfrastructureAnalyzer.getRemoteMaxMemorySortBuffer(); return ret * OptimizerUtils.MEM_UTIL_FACTOR; } - public static double getRemoteMemBudgetReduce() - { + public static double getRemoteMemBudgetReduce() { double ret = InfrastructureAnalyzer.getRemoteMaxMemoryReduce(); return ret * OptimizerUtils.MEM_UTIL_FACTOR; } + + public static boolean isMaxLocalParallelism(int k) { + return InfrastructureAnalyzer.getLocalParallelism() == k; + } public static boolean checkSparkBroadcastMemoryBudget( double size ) { http://git-wip-us.apache.org/repos/asf/systemml/blob/4c7640b8/src/main/java/org/apache/sysml/hops/UnaryOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/UnaryOp.java b/src/main/java/org/apache/sysml/hops/UnaryOp.java index a6a9d66..7b7c685 100644 --- a/src/main/java/org/apache/sysml/hops/UnaryOp.java +++ b/src/main/java/org/apache/sysml/hops/UnaryOp.java @@ -177,9 +177,10 @@ public class UnaryOp extends Hop implements MultiThreadedHop } else //default unary { - int k = isCumulativeUnaryOperation() ? OptimizerUtils.getConstrainedNumThreads( _maxNumThreads ) : 1; - Unary unary1 = new Unary(input.constructLops(), HopsOpOp1LopsU.get(_op), - getDataType(), getValueType(), et, k); + int k = isCumulativeUnaryOperation() || isExpensiveUnaryOperation() ? + OptimizerUtils.getConstrainedNumThreads( _maxNumThreads ) : 1; + Unary unary1 = new Unary(input.constructLops(), + HopsOpOp1LopsU.get(_op), getDataType(), getValueType(), et, k); setOutputDimensions(unary1); setLineNumbers(unary1); setLops(unary1); @@ -612,21 +613,26 @@ public class UnaryOp extends Hop implements MultiThreadedHop return ( _op == OpOp1.INVERSE ); } - public boolean isCumulativeUnaryOperation() - { - return ( _op == OpOp1.CUMSUM - || _op == OpOp1.CUMPROD - || _op == OpOp1.CUMMIN - || _op == OpOp1.CUMMAX ); + public boolean isCumulativeUnaryOperation() { + return (_op == OpOp1.CUMSUM + || _op == OpOp1.CUMPROD + || _op == OpOp1.CUMMIN + || _op == OpOp1.CUMMAX); } public boolean isCastUnaryOperation() { - return ( _op == OpOp1.CAST_AS_MATRIX - || _op == OpOp1.CAST_AS_SCALAR - || _op == OpOp1.CAST_AS_FRAME - || _op == OpOp1.CAST_AS_BOOLEAN - || _op == OpOp1.CAST_AS_DOUBLE - || _op == OpOp1.CAST_AS_INT ); + return (_op == OpOp1.CAST_AS_MATRIX + || _op == OpOp1.CAST_AS_SCALAR + || _op == OpOp1.CAST_AS_FRAME + || _op == OpOp1.CAST_AS_BOOLEAN + || _op == OpOp1.CAST_AS_DOUBLE + || _op == OpOp1.CAST_AS_INT); + } + + public boolean isExpensiveUnaryOperation() { + return (_op == OpOp1.EXP + || _op == OpOp1.LOG + || _op == OpOp1.SIGMOID); } @Override http://git-wip-us.apache.org/repos/asf/systemml/blob/4c7640b8/src/main/java/org/apache/sysml/lops/Unary.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/Unary.java b/src/main/java/org/apache/sysml/lops/Unary.java index 7c1ceeb..a5403b2 100644 --- a/src/main/java/org/apache/sysml/lops/Unary.java +++ b/src/main/java/org/apache/sysml/lops/Unary.java @@ -325,11 +325,14 @@ public class Unary extends Lop } } - public static boolean isCumulativeOp(OperationTypes op) { + public static boolean isMultiThreadedOp(OperationTypes op) { return op==OperationTypes.CUMSUM || op==OperationTypes.CUMPROD || op==OperationTypes.CUMMIN - || op==OperationTypes.CUMMAX; + || op==OperationTypes.CUMMAX + || op==OperationTypes.EXP + || op==OperationTypes.LOG + || op==OperationTypes.SIGMOID; } @Override @@ -351,7 +354,7 @@ public class Unary extends Lop sb.append( prepOutputOperand(output) ); //num threads for cumulative cp ops - if( getExecType() == ExecType.CP && isCumulativeOp(operation) ) { + if( getExecType() == ExecType.CP && isMultiThreadedOp(operation) ) { sb.append( OPERAND_DELIMITOR ); sb.append( _numThreads ); } http://git-wip-us.apache.org/repos/asf/systemml/blob/4c7640b8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java index e13f2a7..a7f5834 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java @@ -1317,7 +1317,8 @@ public class OptimizerRuleBased extends Optimizer && !HopRewriteUtils.isValidOp(((ParameterizedBuiltinOp)h).getOp(), ParamBuiltinOp.GROUPEDAGG, ParamBuiltinOp.REXPAND)) && !( h instanceof UnaryOp //only unaryop-cumulativeagg - && !((UnaryOp)h).isCumulativeUnaryOperation() ) + && !((UnaryOp)h).isCumulativeUnaryOperation() + && !((UnaryOp)h).isExpensiveUnaryOperation()) && !( h instanceof ReorgOp //only reorgop-transpose && ((ReorgOp)h).getOp() != ReOrgOp.TRANS )) { http://git-wip-us.apache.org/repos/asf/systemml/blob/4c7640b8/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java index 9dbb24e..395a4ec 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java @@ -64,6 +64,7 @@ import org.apache.sysml.runtime.instructions.cp.UaggOuterChainCPInstruction; import org.apache.sysml.runtime.instructions.cp.UnaryCPInstruction; import org.apache.sysml.runtime.instructions.cp.VariableCPInstruction; import org.apache.sysml.runtime.instructions.cpfile.MatrixIndexingCPFileInstruction; +import org.apache.sysml.runtime.util.UtilFunctions; public class CPInstructionParser extends InstructionParser { @@ -385,7 +386,8 @@ public class CPInstructionParser extends InstructionParser case Builtin: String []parts = InstructionUtils.getInstructionPartsWithValueType(str); if ( parts[0].equals("log") || parts[0].equals("log_nz") ) { - if ( parts.length == 3 ) { + if ( parts.length == 3 || (parts.length == 4 && + UtilFunctions.isIntegerNumber(parts[3])) ) { // B=log(A), y=log(x) return UnaryCPInstruction.parseInstruction(str); } else if ( parts.length == 4 ) { http://git-wip-us.apache.org/repos/asf/systemml/blob/4c7640b8/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java index 07be2a7..8e023ea 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java @@ -62,7 +62,7 @@ public abstract class UnaryCPInstruction extends ComputationCPInstruction { out.split(parts[2]); func = Builtin.getBuiltinFnObject(opcode); - if( Arrays.asList(new String[]{"ucumk+","ucum*","ucummin","ucummax"}).contains(opcode) ) + if( Arrays.asList(new String[]{"ucumk+","ucum*","ucummin","ucummax","exp","log","sigmoid"}).contains(opcode) ) return new UnaryMatrixCPInstruction(new UnaryOperator(func,Integer.parseInt(parts[3])), in, out, opcode, str); else return new UnaryScalarCPInstruction(null, in, out, opcode, str); http://git-wip-us.apache.org/repos/asf/systemml/blob/4c7640b8/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index f738227..bb5e79b 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -2577,16 +2577,25 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab ret.reset(rlen, clen, sp); //core execute - if( LibMatrixAgg.isSupportedUnaryOperator(op) ) - { + if( LibMatrixAgg.isSupportedUnaryOperator(op) ) { //e.g., cumsum/cumprod/cummin/cumax if( op.getNumThreads() > 1 ) LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op, op.getNumThreads()); else LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op); } - else - { + else if(!sparse && !isEmptyBlock(false) && getDenseBlock().isContiguous() + && OptimizerUtils.isMaxLocalParallelism(op.getNumThreads())) { + //note: we apply multi-threading in a best-effort manner here + //only for expensive operators such as exp, log, sigmoid, because + //otherwise allocation, read and write anyway dominates + ret.allocateDenseBlock(false); + double[] a = getDenseBlockValues(); + double[] c = ret.getDenseBlockValues(); + Arrays.parallelSetAll(c, i -> op.fn.execute(a[i])); + ret.recomputeNonZeros(); + } + else { //default execute unary operations if(op.sparseSafe) sparseUnaryOperations(op, ret);
