[SYSTEMML-400] Multi-threaded dense/sparse-dense transpose operations

This patch introduces multi-threaded transpose operations into compiler
and runtime. Despite significant serial fraction (for result
allocation), multi-threading improved performance by 2-3x for
sufficiently large matrices. This change also includes a refactoring of
core transpose operations to reduce code duplication and enable a common
codepath for single/multi-threaded operations.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/77b8e088
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/77b8e088
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/77b8e088

Branch: refs/heads/master
Commit: 77b8e0888a333e3124fcc8ba3145a2f059f8f0fc
Parents: 785d151
Author: Matthias Boehm <[email protected]>
Authored: Fri Apr 29 22:49:32 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Sat Apr 30 14:00:58 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/ReorgOp.java     |  27 ++-
 .../java/org/apache/sysml/lops/Transform.java   |  29 +--
 .../parfor/opt/OptimizerRuleBased.java          |   4 +-
 .../instructions/cp/ReorgCPInstruction.java     |   7 +-
 .../runtime/matrix/data/LibMatrixReorg.java     | 194 ++++++++++++++-----
 .../runtime/matrix/operators/ReorgOperator.java |  20 +-
 6 files changed, 209 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/77b8e088/src/main/java/org/apache/sysml/hops/ReorgOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/ReorgOp.java 
b/src/main/java/org/apache/sysml/hops/ReorgOp.java
index a88a098..d81e777 100644
--- a/src/main/java/org/apache/sysml/hops/ReorgOp.java
+++ b/src/main/java/org/apache/sysml/hops/ReorgOp.java
@@ -22,6 +22,7 @@ package org.apache.sysml.hops;
 import java.util.ArrayList;
 
 import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.hops.Hop.MultiThreadedHop;
 import org.apache.sysml.hops.rewrite.HopRewriteUtils;
 import org.apache.sysml.lops.Aggregate;
 import org.apache.sysml.lops.Group;
@@ -47,7 +48,7 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
  *  and (2) most importantly semantic of reshape is exactly a reorg op. 
  */
 
-public class ReorgOp extends Hop 
+public class ReorgOp extends Hop implements MultiThreadedHop
 {
        
        public static boolean FORCE_DIST_SORT_INDEXES = false;
@@ -55,7 +56,8 @@ public class ReorgOp extends Hop
        public boolean bSortSPRewriteApplicable = false;
        
        private ReOrgOp op;
-
+       private int _maxNumThreads = -1; //-1 for unlimited
+       
        private ReorgOp() {
                //default constructor for clone
        }
@@ -86,6 +88,16 @@ public class ReorgOp extends Hop
                refreshSizeInformation();
        }
 
+       @Override
+       public void setMaxNumThreads( int k ) {
+               _maxNumThreads = k;
+       }
+       
+       @Override
+       public int getMaxNumThreads() {
+               return _maxNumThreads;
+       }
+       
        public ReOrgOp getOp()
        {
                return op;
@@ -111,6 +123,15 @@ public class ReorgOp extends Hop
                switch( op )
                {
                        case TRANSPOSE:
+                       {
+                               int k = 
OptimizerUtils.getConstrainedNumThreads(_maxNumThreads);
+                               Transform transform1 = new Transform( 
getInput().get(0).constructLops(), 
+                                               HopsTransf2Lops.get(op), 
getDataType(), getValueType(), et, k);
+                               setOutputDimensions(transform1);
+                               setLineNumbers(transform1);
+                               setLops(transform1);                    
+                               break;
+                       }
                        case DIAG:
                        {
                                Transform transform1 = new Transform( 
getInput().get(0).constructLops(), 
@@ -591,6 +612,7 @@ public class ReorgOp extends Hop
                
                //copy specific attributes
                ret.op = op;
+               ret._maxNumThreads = _maxNumThreads;
                
                return ret;
        }
@@ -603,6 +625,7 @@ public class ReorgOp extends Hop
                
                ReorgOp that2 = (ReorgOp)that;          
                boolean ret =  (op == that2.op)
+                                   && (_maxNumThreads == that2._maxNumThreads)
                                    && 
(getInput().size()==that.getInput().size());
                                
                //compare all childs (see reshape, sort)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/77b8e088/src/main/java/org/apache/sysml/lops/Transform.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/Transform.java 
b/src/main/java/org/apache/sysml/lops/Transform.java
index e666aa8..6b995f2 100644
--- a/src/main/java/org/apache/sysml/lops/Transform.java
+++ b/src/main/java/org/apache/sysml/lops/Transform.java
@@ -32,8 +32,6 @@ import org.apache.sysml.parser.Expression.*;
 
 public class Transform extends Lop
 {
-
-       
        public enum OperationTypes {
                Transpose,
                Diag,
@@ -42,30 +40,32 @@ public class Transform extends Lop
                Rev
        };
        
-       private boolean _bSortIndInMem = false;
-       
        private OperationTypes operation = null;
-       
+       private boolean _bSortIndInMem = false;
+       private int _numThreads = 1;
+               
        /**
         * Constructor when we have one input.
         * @param input
         * @param op
         */
 
-       public Transform(Lop input, Transform.OperationTypes op, DataType dt, 
ValueType vt, ExecType et) 
-       {
+       public Transform(Lop input, Transform.OperationTypes op, DataType dt, 
ValueType vt, ExecType et) {
+               this(input, op, dt, vt, et, 1);         
+       }
+       
+       public Transform(Lop input, Transform.OperationTypes op, DataType dt, 
ValueType vt, ExecType et, int k)  {
                super(Lop.Type.Transform, dt, vt);              
                init(input, op, dt, vt, et);
+               _numThreads = k;
        }
        
-       public Transform(Lop input, Transform.OperationTypes op, DataType dt, 
ValueType vt) 
-       {
+       public Transform(Lop input, Transform.OperationTypes op, DataType dt, 
ValueType vt) {
                super(Lop.Type.Transform, dt, vt);              
                init(input, op, dt, vt, ExecType.MR);
        }
 
-       public Transform(Lop input, Transform.OperationTypes op, DataType dt, 
ValueType vt, ExecType et, boolean bSortIndInMem) 
-       {
+       public Transform(Lop input, Transform.OperationTypes op, DataType dt, 
ValueType vt, ExecType et, boolean bSortIndInMem) {
                super(Lop.Type.Transform, dt, vt);              
                _bSortIndInMem = bSortIndInMem;
                init(input, op, dt, vt, et);
@@ -166,6 +166,11 @@ public class Transform extends Lop
                sb.append( getInputs().get(0).prepInputOperand(input1));
                sb.append( OPERAND_DELIMITOR );
                sb.append( this.prepOutputOperand(output));
+
+               if( getExecType()==ExecType.CP && operation == 
OperationTypes.Transpose ) {
+                       sb.append( OPERAND_DELIMITOR );
+                       sb.append( _numThreads );
+               }
                
                return sb.toString();
        }
@@ -198,7 +203,7 @@ public class Transform extends Lop
                
                if( getExecType()==ExecType.SPARK && operation == 
OperationTypes.Sort ){
                        sb.append( OPERAND_DELIMITOR );
-                       sb.append( _bSortIndInMem);
+                       sb.append( _bSortIndInMem );
                }
                
                return sb.toString();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/77b8e088/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 8344522..4ac54ce 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -1538,7 +1538,9 @@ public class OptimizerRuleBased extends Optimizer
                                                && !( h instanceof 
ParameterizedBuiltinOp //only paramop-grpagg
                                                         && 
((ParameterizedBuiltinOp)h).getOp()!=ParamBuiltinOp.GROUPEDAGG)
                                                && !( h instanceof UnaryOp 
//only unaryop-cumulativeagg
-                                                        && 
!((UnaryOp)h).isCumulativeUnaryOperation() )      )
+                                                        && 
!((UnaryOp)h).isCumulativeUnaryOperation() )
+                                               && !( h instanceof ReorgOp 
//only reorgop-transpose
+                                                        && 
((ReorgOp)h).getOp() != ReOrgOp.TRANSPOSE ))
                                        {
                                                MultiThreadedHop mhop = 
(MultiThreadedHop) h;
                                                mhop.setMaxNumThreads(opsK); 
//set max constraint in hop

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/77b8e088/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java
index 7e37c21..63156a1 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java
@@ -83,8 +83,11 @@ public class ReorgCPInstruction extends UnaryCPInstruction
                String opcode = parts[0];
                
                if ( opcode.equalsIgnoreCase("r'") ) {
-                       parseUnaryInstruction(str, in, out); //max 2 operands
-                       return new ReorgCPInstruction(new 
ReorgOperator(SwapIndex.getSwapIndexFnObject()), in, out, opcode, str);
+                       InstructionUtils.checkNumFields(str, 2, 3);
+                       in.split(parts[1]);
+                       out.split(parts[2]);
+                       int k = Integer.parseInt(parts[3]);
+                       return new ReorgCPInstruction(new 
ReorgOperator(SwapIndex.getSwapIndexFnObject(), k), in, out, opcode, str);
                } 
                else if ( opcode.equalsIgnoreCase("rev") ) {
                        parseUnaryInstruction(str, in, out); //max 2 operands

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/77b8e088/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
index 710c5bc..67800fc 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
@@ -26,7 +26,12 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.functionobjects.DiagIndex;
@@ -54,7 +59,7 @@ import org.apache.sysml.runtime.util.UtilFunctions;
  */
 public class LibMatrixReorg 
 {
-       
+       public static final long PAR_NUMCELL_THRESHOLD = 1024*1024;   //Min 1M 
elements
        public static final boolean SHALLOW_DENSE_VECTOR_TRANSPOSE = true;
        public static final boolean SHALLOW_DENSE_ROWWISE_RESHAPE = true;
        public static final boolean ALLOW_BLOCK_REUSE = false;
@@ -101,7 +106,10 @@ public class LibMatrixReorg
                switch( type )
                {
                        case TRANSPOSE: 
-                               return transpose(in, out);
+                               if( op.getNumThreads() > 1 )
+                                       return transpose(in, out, 
op.getNumThreads());
+                               else
+                                       return transpose(in, out);
                        case REV: 
                                return rev(in, out);
                        case DIAG:      
@@ -125,18 +133,37 @@ public class LibMatrixReorg
        public static MatrixBlock transpose( MatrixBlock in, MatrixBlock out ) 
                throws DMLRuntimeException
        {
-               //Timing time = new Timing(true);
-       
                //sparse-safe operation
                if( in.isEmptyBlock(false) )
                        return out;
+       
+               //set basic meta data
+               out.nonZeros = in.nonZeros;
+               
+               //shallow dense vector transpose (w/o result allocation)
+               //since the physical representation of dense vectors is always 
the same,
+               //we don't need to create a copy, given our copy on write 
semantics.
+               //however, note that with update in-place this would be an 
invalid optimization
+               if( SHALLOW_DENSE_VECTOR_TRANSPOSE && !in.sparse && !out.sparse 
&& (in.rlen==1 || in.clen==1)  ) {
+                       out.denseBlock = in.denseBlock;
+                       return out;
+               }
                
+               //Timing time = new Timing(true);
+               
+               //allocate output arrays (if required)
+               if( out.sparse )
+                       out.allocateSparseRowsBlock(false);
+               else
+                       out.allocateDenseBlock(false);
+       
+               //execute transpose operation
                if( !in.sparse && !out.sparse )
-                       transposeDenseToDense( in, out );
+                       transposeDenseToDense( in, out, 0, in.rlen, 0, in.clen 
);
                else if( in.sparse && out.sparse )
                        transposeSparseToSparse( in, out );
                else if( in.sparse )
-                       transposeSparseToDense( in, out );
+                       transposeSparseToDense( in, out, 0, in.rlen, 0, in.clen 
);
                else
                        transposeDenseToSparse( in, out );
                
@@ -149,6 +176,59 @@ public class LibMatrixReorg
         * 
         * @param in
         * @param out
+        * @param k
+        * @return
+        * @throws DMLRuntimeException
+        */
+       public static MatrixBlock transpose( MatrixBlock in, MatrixBlock out, 
int k ) 
+               throws DMLRuntimeException
+       {
+               //redirect small or special cases to sequential execution
+               if( in.isEmptyBlock(false) || (in.rlen * in.clen < 
PAR_NUMCELL_THRESHOLD)
+                       || (SHALLOW_DENSE_VECTOR_TRANSPOSE && !in.sparse && 
!out.sparse && (in.rlen==1 || in.clen==1) )
+                       || (in.sparse && !out.sparse && in.rlen==1) || 
out.sparse )
+               {
+                       return transpose(in, out);
+               }
+               
+               //Timing time = new Timing(true);
+               
+               //set meta data and allocate output arrays (if required)
+               out.nonZeros = in.nonZeros;
+               if( out.sparse )
+                       out.allocateSparseRowsBlock(false);
+               else
+                       out.allocateDenseBlock(false);
+               
+               //core multi-threaded transpose
+               try {
+                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ArrayList<TransposeTask> tasks = new 
ArrayList<TransposeTask>();
+                       boolean row = in.sparse || in.rlen >= in.clen;
+                       int len = row ? in.rlen : in.clen;
+                       int blklen = (int)(Math.ceil((double)len/k));
+                       blklen += (blklen%8 != 0)?8-blklen%8:0;
+                       for( int i=0; i<k & i*blklen<in.rlen; i++ )
+                               tasks.add(new TransposeTask(in, out, row, 
i*blklen, Math.min((i+1)*blklen, len)));
+                       //execute tasks and check for errors
+                       List<Future<Object>> taskret = pool.invokeAll(tasks);   
+                       pool.shutdown();
+                       for( Future<Object> task : taskret )
+                               task.get();
+               }
+               catch(Exception ex) {
+                       throw new DMLRuntimeException(ex);
+               }       
+               
+               //System.out.println("r' k="+k+" ("+in.rlen+", "+in.clen+", 
"+in.sparse+", "+out.sparse+") in "+time.stop()+" ms.");
+               
+               return out;
+       }
+       
+       /**
+        * 
+        * @param in
+        * @param out
         * @return
         * @throws DMLRuntimeException
         */
@@ -721,36 +801,21 @@ public class LibMatrixReorg
         * @param out
         * @throws DMLRuntimeException 
         */
-       private static void transposeDenseToDense(MatrixBlock in, MatrixBlock 
out) 
+       private static void transposeDenseToDense(MatrixBlock in, MatrixBlock 
out, int rl, int ru, int cl, int cu) 
                throws DMLRuntimeException
        {
                final int m = in.rlen;
                final int n = in.clen;
-               final int m2 = out.rlen;
                final int n2 = out.clen;
                
-               //set basic meta data
-               out.sparse = false;
-               out.nonZeros = in.nonZeros;
-               
-               //shallow dense vector transpose (w/o result allocation)
-               if( SHALLOW_DENSE_VECTOR_TRANSPOSE && (m==1 || n==1) ) {
-                       //since the physical representation of dense vectors is 
always the same,
-                       //we don't need to create a copy, given our copy on 
write semantics.
-                       //however, note that with update in-place this would be 
an invalid optimization
-                       out.denseBlock = in.denseBlock;
-                       return;
-               }
-               
-               //allocate output arrays (if required)
-               out.allocateDenseBlock(false);
-               
                double[] a = in.getDenseBlock();
                double[] c = out.getDenseBlock();
                
                if( m==1 || n==1 ) //VECTOR TRANSPOSE
                {
-                       System.arraycopy(a, 0, c, 0, m2*n2);
+                       //plain memcopy, in case shallow dense copy no applied 
+                       int ix = rl+cl; int len = ru+cu-ix-1;
+                       System.arraycopy(a, ix, c, ix, len);
                }
                else //MATRIX TRANSPOSE
                {
@@ -759,11 +824,11 @@ public class LibMatrixReorg
                        final int blocksizeJ = 128; 
                        
                        //blocked execution
-                       for( int bi = 0; bi<m; bi+=blocksizeI )
-                               for( int bj = 0; bj<n; bj+=blocksizeJ )
+                       for( int bi = rl; bi<ru; bi+=blocksizeI )
+                               for( int bj = cl; bj<cu; bj+=blocksizeJ )
                                {
-                                       int bimin = Math.min(bi+blocksizeI, m);
-                                       int bjmin = Math.min(bj+blocksizeJ, n);
+                                       int bimin = Math.min(bi+blocksizeI, ru);
+                                       int bjmin = Math.min(bj+blocksizeJ, cu);
                                        //core transpose operation
                                        for( int i=bi; i<bimin; i++ )
                                        {
@@ -782,16 +847,14 @@ public class LibMatrixReorg
         */
        private static void transposeDenseToSparse(MatrixBlock in, MatrixBlock 
out)
        {
+               //NOTE: called only in sequential execution
+               
                final int m = in.rlen;
                final int n = in.clen;
                final int m2 = out.rlen;
                final int n2 = out.clen;
                final int ennz2 = (int) (in.nonZeros/m2); 
                
-               //allocate output arrays (if required)
-               out.reset(m2, n2, true); //always sparse
-               out.allocateSparseRowsBlock();
-                               
                double[] a = in.getDenseBlock();
                SparseBlock c = out.getSparseBlock();
                
@@ -813,8 +876,6 @@ public class LibMatrixReorg
                                                c.append(j, i, a[aix]);
                                        }
                        }
-               
-               out.nonZeros = in.nonZeros;
        }
        
        /**
@@ -824,16 +885,14 @@ public class LibMatrixReorg
         */
        private static void transposeSparseToSparse(MatrixBlock in, MatrixBlock 
out)
        {
+               //NOTE: called only in sequential execution
+               
                final int m = in.rlen;
                final int n = in.clen;
                final int m2 = out.rlen;
                final int n2 = out.clen;
                final int ennz2 = (int) (in.nonZeros/m2); 
                
-               //allocate output arrays (if required)
-               out.reset(m2, n2, true); //always sparse
-               out.allocateSparseRowsBlock();
-               
                SparseBlock a = in.getSparseBlock();
                SparseBlock c = out.getSparseBlock();
 
@@ -889,7 +948,6 @@ public class LibMatrixReorg
                                }
                        }
                }
-               out.nonZeros = in.nonZeros;
        }
        
        /**
@@ -898,23 +956,19 @@ public class LibMatrixReorg
         * @param out
         * @throws DMLRuntimeException 
         */
-       private static void transposeSparseToDense(MatrixBlock in, MatrixBlock 
out) 
+       private static void transposeSparseToDense(MatrixBlock in, MatrixBlock 
out, int rl, int ru, int cl, int cu) 
                throws DMLRuntimeException
        {
                final int m = in.rlen;
                final int n = in.clen;
-               final int m2 = out.rlen;
                final int n2 = out.clen;
                
-               //allocate output arrays (if required)
-               out.reset(m2, n2, false); //always dense
-               out.allocateDenseBlock();
-               
                SparseBlock a = in.getSparseBlock();
                double[] c = out.getDenseBlock();
                
                if( m==1 ) //ROW VECTOR TRANSPOSE
                {
+                       //NOTE: called only in sequential execution
                        int alen = a.size(0); //always pos 0
                        int[] aix = a.indexes(0);
                        double[] avals = a.values(0);
@@ -931,12 +985,12 @@ public class LibMatrixReorg
                        int[] ix = new int[blocksizeI];
                        
                        //blocked execution
-                       for( int bi = 0; bi<m; bi+=blocksizeI )
+                       for( int bi = rl; bi<ru; bi+=blocksizeI )
                        {
                                Arrays.fill(ix, 0);
                                for( int bj = 0; bj<n; bj+=blocksizeJ )
                                {
-                                       int bimin = Math.min(bi+blocksizeI, m);
+                                       int bimin = Math.min(bi+blocksizeI, ru);
                                        int bjmin = Math.min(bj+blocksizeJ, n);
        
                                        //core transpose operation
@@ -956,7 +1010,6 @@ public class LibMatrixReorg
                                }
                        }
                }
-               out.nonZeros = in.nonZeros;
        }
        
        /**
@@ -2266,4 +2319,47 @@ public class LibMatrixReorg
                        return (val0 > val1 ? -1 : (val0 == val1 ? 0 : 1));
                }               
        }
+       
+       /**
+        * 
+        */
+       private static class TransposeTask implements Callable<Object>
+       {
+               private MatrixBlock _in = null;
+               private MatrixBlock _out = null;
+               private boolean _row = false;
+               private int _rl = -1;
+               private int _ru = -1;
+
+               protected TransposeTask(MatrixBlock in, MatrixBlock out, 
boolean row, int rl, int ru) 
+                       throws DMLRuntimeException
+               {
+                       _in = in;
+                       _out = out;
+                       _row = row;
+                       _rl = rl;
+                       _ru = ru;
+               }
+               
+               @Override
+               public Object call() throws DMLRuntimeException
+               {
+                       int rl = _row ? _rl : 0;
+                       int ru = _row ? _ru : _in.rlen;
+                       int cl = _row ? 0 : _rl;
+                       int cu = _row ? _in.clen : _ru;
+                       
+                       //execute transpose operation
+                       if( !_in.sparse && !_out.sparse )
+                               transposeDenseToDense( _in, _out, rl, ru, cl, 
cu );
+                       else if( _in.sparse && _out.sparse )
+                               throw new DMLRuntimeException("Unsupported 
multi-threaded sparse-sparse transpose.");
+                       else if( _in.sparse )
+                               transposeSparseToDense( _in, _out, rl, ru, cl, 
cu );
+                       else
+                               throw new DMLRuntimeException("Unsupported 
multi-threaded dense-sparse transpose.");
+                       
+                       return null;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/77b8e088/src/main/java/org/apache/sysml/runtime/matrix/operators/ReorgOperator.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/operators/ReorgOperator.java 
b/src/main/java/org/apache/sysml/runtime/matrix/operators/ReorgOperator.java
index 407a4da..54c346f 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/operators/ReorgOperator.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/operators/ReorgOperator.java
@@ -26,16 +26,24 @@ import 
org.apache.sysml.runtime.functionobjects.IndexFunction;
 
 public class ReorgOperator  extends Operator implements Serializable
 {
-
-
        private static final long serialVersionUID = -5322516429026298404L;
 
        public IndexFunction fn;
+       private int k; //num threads
        
-       public ReorgOperator(IndexFunction p)
-       {
-               fn=p;
-               sparseSafe=true;
+       public ReorgOperator(IndexFunction p) {
+               //default degree of parallelism is 1 
+               //(for example in MR/Spark because we parallelize over the 
number of blocks)
+               this( p, 1 );
+       }
+               
+       public ReorgOperator(IndexFunction p, int numThreads) {
+               fn = p;
+               sparseSafe = true;
+               k = numThreads;
        }
 
+       public int getNumThreads() {
+               return k;
+       }
 }

Reply via email to