[SYSTEMML-400] Multi-threaded dense/sparse-dense transpose operations This patch introduces multi-threaded transpose operations into compiler and runtime. Despite significant serial fraction (for result allocation), multi-threading improved performance by 2-3x for sufficiently large matrices. This change also includes a refactoring of core transpose operations to reduce code duplication and enable a common codepath for single/multi-threaded operations.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/77b8e088 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/77b8e088 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/77b8e088 Branch: refs/heads/master Commit: 77b8e0888a333e3124fcc8ba3145a2f059f8f0fc Parents: 785d151 Author: Matthias Boehm <[email protected]> Authored: Fri Apr 29 22:49:32 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Sat Apr 30 14:00:58 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/sysml/hops/ReorgOp.java | 27 ++- .../java/org/apache/sysml/lops/Transform.java | 29 +-- .../parfor/opt/OptimizerRuleBased.java | 4 +- .../instructions/cp/ReorgCPInstruction.java | 7 +- .../runtime/matrix/data/LibMatrixReorg.java | 194 ++++++++++++++----- .../runtime/matrix/operators/ReorgOperator.java | 20 +- 6 files changed, 209 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/77b8e088/src/main/java/org/apache/sysml/hops/ReorgOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/ReorgOp.java b/src/main/java/org/apache/sysml/hops/ReorgOp.java index a88a098..d81e777 100644 --- a/src/main/java/org/apache/sysml/hops/ReorgOp.java +++ b/src/main/java/org/apache/sysml/hops/ReorgOp.java @@ -22,6 +22,7 @@ package org.apache.sysml.hops; import java.util.ArrayList; import org.apache.sysml.conf.ConfigurationManager; +import org.apache.sysml.hops.Hop.MultiThreadedHop; import org.apache.sysml.hops.rewrite.HopRewriteUtils; import org.apache.sysml.lops.Aggregate; import org.apache.sysml.lops.Group; @@ -47,7 +48,7 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics; * and (2) most importantly semantic of reshape is exactly a reorg op. */ -public class ReorgOp extends Hop +public class ReorgOp extends Hop implements MultiThreadedHop { public static boolean FORCE_DIST_SORT_INDEXES = false; @@ -55,7 +56,8 @@ public class ReorgOp extends Hop public boolean bSortSPRewriteApplicable = false; private ReOrgOp op; - + private int _maxNumThreads = -1; //-1 for unlimited + private ReorgOp() { //default constructor for clone } @@ -86,6 +88,16 @@ public class ReorgOp extends Hop refreshSizeInformation(); } + @Override + public void setMaxNumThreads( int k ) { + _maxNumThreads = k; + } + + @Override + public int getMaxNumThreads() { + return _maxNumThreads; + } + public ReOrgOp getOp() { return op; @@ -111,6 +123,15 @@ public class ReorgOp extends Hop switch( op ) { case TRANSPOSE: + { + int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads); + Transform transform1 = new Transform( getInput().get(0).constructLops(), + HopsTransf2Lops.get(op), getDataType(), getValueType(), et, k); + setOutputDimensions(transform1); + setLineNumbers(transform1); + setLops(transform1); + break; + } case DIAG: { Transform transform1 = new Transform( getInput().get(0).constructLops(), @@ -591,6 +612,7 @@ public class ReorgOp extends Hop //copy specific attributes ret.op = op; + ret._maxNumThreads = _maxNumThreads; return ret; } @@ -603,6 +625,7 @@ public class ReorgOp extends Hop ReorgOp that2 = (ReorgOp)that; boolean ret = (op == that2.op) + && (_maxNumThreads == that2._maxNumThreads) && (getInput().size()==that.getInput().size()); //compare all childs (see reshape, sort) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/77b8e088/src/main/java/org/apache/sysml/lops/Transform.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/Transform.java b/src/main/java/org/apache/sysml/lops/Transform.java index e666aa8..6b995f2 100644 --- a/src/main/java/org/apache/sysml/lops/Transform.java +++ b/src/main/java/org/apache/sysml/lops/Transform.java @@ -32,8 +32,6 @@ import org.apache.sysml.parser.Expression.*; public class Transform extends Lop { - - public enum OperationTypes { Transpose, Diag, @@ -42,30 +40,32 @@ public class Transform extends Lop Rev }; - private boolean _bSortIndInMem = false; - private OperationTypes operation = null; - + private boolean _bSortIndInMem = false; + private int _numThreads = 1; + /** * Constructor when we have one input. * @param input * @param op */ - public Transform(Lop input, Transform.OperationTypes op, DataType dt, ValueType vt, ExecType et) - { + public Transform(Lop input, Transform.OperationTypes op, DataType dt, ValueType vt, ExecType et) { + this(input, op, dt, vt, et, 1); + } + + public Transform(Lop input, Transform.OperationTypes op, DataType dt, ValueType vt, ExecType et, int k) { super(Lop.Type.Transform, dt, vt); init(input, op, dt, vt, et); + _numThreads = k; } - public Transform(Lop input, Transform.OperationTypes op, DataType dt, ValueType vt) - { + public Transform(Lop input, Transform.OperationTypes op, DataType dt, ValueType vt) { super(Lop.Type.Transform, dt, vt); init(input, op, dt, vt, ExecType.MR); } - public Transform(Lop input, Transform.OperationTypes op, DataType dt, ValueType vt, ExecType et, boolean bSortIndInMem) - { + public Transform(Lop input, Transform.OperationTypes op, DataType dt, ValueType vt, ExecType et, boolean bSortIndInMem) { super(Lop.Type.Transform, dt, vt); _bSortIndInMem = bSortIndInMem; init(input, op, dt, vt, et); @@ -166,6 +166,11 @@ public class Transform extends Lop sb.append( getInputs().get(0).prepInputOperand(input1)); sb.append( OPERAND_DELIMITOR ); sb.append( this.prepOutputOperand(output)); + + if( getExecType()==ExecType.CP && operation == OperationTypes.Transpose ) { + sb.append( OPERAND_DELIMITOR ); + sb.append( _numThreads ); + } return sb.toString(); } @@ -198,7 +203,7 @@ public class Transform extends Lop if( getExecType()==ExecType.SPARK && operation == OperationTypes.Sort ){ sb.append( OPERAND_DELIMITOR ); - sb.append( _bSortIndInMem); + sb.append( _bSortIndInMem ); } return sb.toString(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/77b8e088/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java index 8344522..4ac54ce 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java @@ -1538,7 +1538,9 @@ public class OptimizerRuleBased extends Optimizer && !( h instanceof ParameterizedBuiltinOp //only paramop-grpagg && ((ParameterizedBuiltinOp)h).getOp()!=ParamBuiltinOp.GROUPEDAGG) && !( h instanceof UnaryOp //only unaryop-cumulativeagg - && !((UnaryOp)h).isCumulativeUnaryOperation() ) ) + && !((UnaryOp)h).isCumulativeUnaryOperation() ) + && !( h instanceof ReorgOp //only reorgop-transpose + && ((ReorgOp)h).getOp() != ReOrgOp.TRANSPOSE )) { MultiThreadedHop mhop = (MultiThreadedHop) h; mhop.setMaxNumThreads(opsK); //set max constraint in hop http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/77b8e088/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java index 7e37c21..63156a1 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java @@ -83,8 +83,11 @@ public class ReorgCPInstruction extends UnaryCPInstruction String opcode = parts[0]; if ( opcode.equalsIgnoreCase("r'") ) { - parseUnaryInstruction(str, in, out); //max 2 operands - return new ReorgCPInstruction(new ReorgOperator(SwapIndex.getSwapIndexFnObject()), in, out, opcode, str); + InstructionUtils.checkNumFields(str, 2, 3); + in.split(parts[1]); + out.split(parts[2]); + int k = Integer.parseInt(parts[3]); + return new ReorgCPInstruction(new ReorgOperator(SwapIndex.getSwapIndexFnObject(), k), in, out, opcode, str); } else if ( opcode.equalsIgnoreCase("rev") ) { parseUnaryInstruction(str, in, out); //max 2 operands http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/77b8e088/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java index 710c5bc..67800fc 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java @@ -26,7 +26,12 @@ import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.functionobjects.DiagIndex; @@ -54,7 +59,7 @@ import org.apache.sysml.runtime.util.UtilFunctions; */ public class LibMatrixReorg { - + public static final long PAR_NUMCELL_THRESHOLD = 1024*1024; //Min 1M elements public static final boolean SHALLOW_DENSE_VECTOR_TRANSPOSE = true; public static final boolean SHALLOW_DENSE_ROWWISE_RESHAPE = true; public static final boolean ALLOW_BLOCK_REUSE = false; @@ -101,7 +106,10 @@ public class LibMatrixReorg switch( type ) { case TRANSPOSE: - return transpose(in, out); + if( op.getNumThreads() > 1 ) + return transpose(in, out, op.getNumThreads()); + else + return transpose(in, out); case REV: return rev(in, out); case DIAG: @@ -125,18 +133,37 @@ public class LibMatrixReorg public static MatrixBlock transpose( MatrixBlock in, MatrixBlock out ) throws DMLRuntimeException { - //Timing time = new Timing(true); - //sparse-safe operation if( in.isEmptyBlock(false) ) return out; + + //set basic meta data + out.nonZeros = in.nonZeros; + + //shallow dense vector transpose (w/o result allocation) + //since the physical representation of dense vectors is always the same, + //we don't need to create a copy, given our copy on write semantics. + //however, note that with update in-place this would be an invalid optimization + if( SHALLOW_DENSE_VECTOR_TRANSPOSE && !in.sparse && !out.sparse && (in.rlen==1 || in.clen==1) ) { + out.denseBlock = in.denseBlock; + return out; + } + //Timing time = new Timing(true); + + //allocate output arrays (if required) + if( out.sparse ) + out.allocateSparseRowsBlock(false); + else + out.allocateDenseBlock(false); + + //execute transpose operation if( !in.sparse && !out.sparse ) - transposeDenseToDense( in, out ); + transposeDenseToDense( in, out, 0, in.rlen, 0, in.clen ); else if( in.sparse && out.sparse ) transposeSparseToSparse( in, out ); else if( in.sparse ) - transposeSparseToDense( in, out ); + transposeSparseToDense( in, out, 0, in.rlen, 0, in.clen ); else transposeDenseToSparse( in, out ); @@ -149,6 +176,59 @@ public class LibMatrixReorg * * @param in * @param out + * @param k + * @return + * @throws DMLRuntimeException + */ + public static MatrixBlock transpose( MatrixBlock in, MatrixBlock out, int k ) + throws DMLRuntimeException + { + //redirect small or special cases to sequential execution + if( in.isEmptyBlock(false) || (in.rlen * in.clen < PAR_NUMCELL_THRESHOLD) + || (SHALLOW_DENSE_VECTOR_TRANSPOSE && !in.sparse && !out.sparse && (in.rlen==1 || in.clen==1) ) + || (in.sparse && !out.sparse && in.rlen==1) || out.sparse ) + { + return transpose(in, out); + } + + //Timing time = new Timing(true); + + //set meta data and allocate output arrays (if required) + out.nonZeros = in.nonZeros; + if( out.sparse ) + out.allocateSparseRowsBlock(false); + else + out.allocateDenseBlock(false); + + //core multi-threaded transpose + try { + ExecutorService pool = Executors.newFixedThreadPool( k ); + ArrayList<TransposeTask> tasks = new ArrayList<TransposeTask>(); + boolean row = in.sparse || in.rlen >= in.clen; + int len = row ? in.rlen : in.clen; + int blklen = (int)(Math.ceil((double)len/k)); + blklen += (blklen%8 != 0)?8-blklen%8:0; + for( int i=0; i<k & i*blklen<in.rlen; i++ ) + tasks.add(new TransposeTask(in, out, row, i*blklen, Math.min((i+1)*blklen, len))); + //execute tasks and check for errors + List<Future<Object>> taskret = pool.invokeAll(tasks); + pool.shutdown(); + for( Future<Object> task : taskret ) + task.get(); + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + + //System.out.println("r' k="+k+" ("+in.rlen+", "+in.clen+", "+in.sparse+", "+out.sparse+") in "+time.stop()+" ms."); + + return out; + } + + /** + * + * @param in + * @param out * @return * @throws DMLRuntimeException */ @@ -721,36 +801,21 @@ public class LibMatrixReorg * @param out * @throws DMLRuntimeException */ - private static void transposeDenseToDense(MatrixBlock in, MatrixBlock out) + private static void transposeDenseToDense(MatrixBlock in, MatrixBlock out, int rl, int ru, int cl, int cu) throws DMLRuntimeException { final int m = in.rlen; final int n = in.clen; - final int m2 = out.rlen; final int n2 = out.clen; - //set basic meta data - out.sparse = false; - out.nonZeros = in.nonZeros; - - //shallow dense vector transpose (w/o result allocation) - if( SHALLOW_DENSE_VECTOR_TRANSPOSE && (m==1 || n==1) ) { - //since the physical representation of dense vectors is always the same, - //we don't need to create a copy, given our copy on write semantics. - //however, note that with update in-place this would be an invalid optimization - out.denseBlock = in.denseBlock; - return; - } - - //allocate output arrays (if required) - out.allocateDenseBlock(false); - double[] a = in.getDenseBlock(); double[] c = out.getDenseBlock(); if( m==1 || n==1 ) //VECTOR TRANSPOSE { - System.arraycopy(a, 0, c, 0, m2*n2); + //plain memcopy, in case shallow dense copy no applied + int ix = rl+cl; int len = ru+cu-ix-1; + System.arraycopy(a, ix, c, ix, len); } else //MATRIX TRANSPOSE { @@ -759,11 +824,11 @@ public class LibMatrixReorg final int blocksizeJ = 128; //blocked execution - for( int bi = 0; bi<m; bi+=blocksizeI ) - for( int bj = 0; bj<n; bj+=blocksizeJ ) + for( int bi = rl; bi<ru; bi+=blocksizeI ) + for( int bj = cl; bj<cu; bj+=blocksizeJ ) { - int bimin = Math.min(bi+blocksizeI, m); - int bjmin = Math.min(bj+blocksizeJ, n); + int bimin = Math.min(bi+blocksizeI, ru); + int bjmin = Math.min(bj+blocksizeJ, cu); //core transpose operation for( int i=bi; i<bimin; i++ ) { @@ -782,16 +847,14 @@ public class LibMatrixReorg */ private static void transposeDenseToSparse(MatrixBlock in, MatrixBlock out) { + //NOTE: called only in sequential execution + final int m = in.rlen; final int n = in.clen; final int m2 = out.rlen; final int n2 = out.clen; final int ennz2 = (int) (in.nonZeros/m2); - //allocate output arrays (if required) - out.reset(m2, n2, true); //always sparse - out.allocateSparseRowsBlock(); - double[] a = in.getDenseBlock(); SparseBlock c = out.getSparseBlock(); @@ -813,8 +876,6 @@ public class LibMatrixReorg c.append(j, i, a[aix]); } } - - out.nonZeros = in.nonZeros; } /** @@ -824,16 +885,14 @@ public class LibMatrixReorg */ private static void transposeSparseToSparse(MatrixBlock in, MatrixBlock out) { + //NOTE: called only in sequential execution + final int m = in.rlen; final int n = in.clen; final int m2 = out.rlen; final int n2 = out.clen; final int ennz2 = (int) (in.nonZeros/m2); - //allocate output arrays (if required) - out.reset(m2, n2, true); //always sparse - out.allocateSparseRowsBlock(); - SparseBlock a = in.getSparseBlock(); SparseBlock c = out.getSparseBlock(); @@ -889,7 +948,6 @@ public class LibMatrixReorg } } } - out.nonZeros = in.nonZeros; } /** @@ -898,23 +956,19 @@ public class LibMatrixReorg * @param out * @throws DMLRuntimeException */ - private static void transposeSparseToDense(MatrixBlock in, MatrixBlock out) + private static void transposeSparseToDense(MatrixBlock in, MatrixBlock out, int rl, int ru, int cl, int cu) throws DMLRuntimeException { final int m = in.rlen; final int n = in.clen; - final int m2 = out.rlen; final int n2 = out.clen; - //allocate output arrays (if required) - out.reset(m2, n2, false); //always dense - out.allocateDenseBlock(); - SparseBlock a = in.getSparseBlock(); double[] c = out.getDenseBlock(); if( m==1 ) //ROW VECTOR TRANSPOSE { + //NOTE: called only in sequential execution int alen = a.size(0); //always pos 0 int[] aix = a.indexes(0); double[] avals = a.values(0); @@ -931,12 +985,12 @@ public class LibMatrixReorg int[] ix = new int[blocksizeI]; //blocked execution - for( int bi = 0; bi<m; bi+=blocksizeI ) + for( int bi = rl; bi<ru; bi+=blocksizeI ) { Arrays.fill(ix, 0); for( int bj = 0; bj<n; bj+=blocksizeJ ) { - int bimin = Math.min(bi+blocksizeI, m); + int bimin = Math.min(bi+blocksizeI, ru); int bjmin = Math.min(bj+blocksizeJ, n); //core transpose operation @@ -956,7 +1010,6 @@ public class LibMatrixReorg } } } - out.nonZeros = in.nonZeros; } /** @@ -2266,4 +2319,47 @@ public class LibMatrixReorg return (val0 > val1 ? -1 : (val0 == val1 ? 0 : 1)); } } + + /** + * + */ + private static class TransposeTask implements Callable<Object> + { + private MatrixBlock _in = null; + private MatrixBlock _out = null; + private boolean _row = false; + private int _rl = -1; + private int _ru = -1; + + protected TransposeTask(MatrixBlock in, MatrixBlock out, boolean row, int rl, int ru) + throws DMLRuntimeException + { + _in = in; + _out = out; + _row = row; + _rl = rl; + _ru = ru; + } + + @Override + public Object call() throws DMLRuntimeException + { + int rl = _row ? _rl : 0; + int ru = _row ? _ru : _in.rlen; + int cl = _row ? 0 : _rl; + int cu = _row ? _in.clen : _ru; + + //execute transpose operation + if( !_in.sparse && !_out.sparse ) + transposeDenseToDense( _in, _out, rl, ru, cl, cu ); + else if( _in.sparse && _out.sparse ) + throw new DMLRuntimeException("Unsupported multi-threaded sparse-sparse transpose."); + else if( _in.sparse ) + transposeSparseToDense( _in, _out, rl, ru, cl, cu ); + else + throw new DMLRuntimeException("Unsupported multi-threaded dense-sparse transpose."); + + return null; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/77b8e088/src/main/java/org/apache/sysml/runtime/matrix/operators/ReorgOperator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/operators/ReorgOperator.java b/src/main/java/org/apache/sysml/runtime/matrix/operators/ReorgOperator.java index 407a4da..54c346f 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/operators/ReorgOperator.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/operators/ReorgOperator.java @@ -26,16 +26,24 @@ import org.apache.sysml.runtime.functionobjects.IndexFunction; public class ReorgOperator extends Operator implements Serializable { - - private static final long serialVersionUID = -5322516429026298404L; public IndexFunction fn; + private int k; //num threads - public ReorgOperator(IndexFunction p) - { - fn=p; - sparseSafe=true; + public ReorgOperator(IndexFunction p) { + //default degree of parallelism is 1 + //(for example in MR/Spark because we parallelize over the number of blocks) + this( p, 1 ); + } + + public ReorgOperator(IndexFunction p, int numThreads) { + fn = p; + sparseSafe = true; + k = numThreads; } + public int getNumThreads() { + return k; + } }
