[SYSTEMML-1310] New parfor block partitioning (compiler/runtime/tests)

This patch extends the parfor optimizer and runtime with the support for
block partitioning, i.e., the partitioning and access of batches of rows
or columns. Batches are required to be aligned and smaller or equal than
the blocksize in the partitioning dimension, but can be arbitrarily
large in the other dimension. So far, only the remote spark data
partitioner supports these new partitioning formats but the resulting
partitions can be used either in local or remote parfor operators. 


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/39f75ca0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/39f75ca0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/39f75ca0

Branch: refs/heads/master
Commit: 39f75ca06012f192d6a8bdea33f1be0bd89fbd8f
Parents: f693bca
Author: Matthias Boehm <[email protected]>
Authored: Tue Mar 21 00:13:27 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Tue Mar 21 12:38:24 2017 -0700

----------------------------------------------------------------------
 .../sysml/parser/ParForStatementBlock.java      | 245 ++++++++++---------
 .../controlprogram/ParForProgramBlock.java      |  82 +++++--
 .../controlprogram/caching/MatrixObject.java    |  16 ++
 .../controlprogram/parfor/DataPartitioner.java  |   3 +-
 .../parfor/DataPartitionerLocal.java            |   9 +-
 .../parfor/DataPartitionerRemoteMR.java         |   8 +-
 .../parfor/DataPartitionerRemoteSpark.java      |   8 +-
 .../DataPartitionerRemoteSparkMapper.java       |  58 +++--
 .../controlprogram/parfor/ProgramConverter.java |  10 +-
 .../controlprogram/parfor/RemoteDPParForMR.java |  12 +-
 .../parfor/RemoteDPParForSpark.java             |  21 +-
 .../parfor/RemoteDPParForSparkWorker.java       |   9 +-
 .../controlprogram/parfor/opt/OptNode.java      |   3 +-
 .../parfor/opt/OptimizerConstrained.java        |  14 +-
 .../parfor/opt/OptimizerRuleBased.java          |  57 +++--
 .../cpfile/MatrixIndexingCPFileInstruction.java |   8 +-
 .../sysml/runtime/matrix/DataPartitionMR.java   |   5 +-
 .../ParForBlockwiseDataPartitioningTest.java    | 215 ++++++++++++++++
 .../parfor/parfor_bcdatapartitioning_neg.R      |  37 +++
 .../parfor/parfor_bcdatapartitioning_neg.dml    |  37 +++
 .../parfor/parfor_bcdatapartitioning_pos.R      |  37 +++
 .../parfor/parfor_bcdatapartitioning_pos.dml    |  37 +++
 .../parfor/parfor_brdatapartitioning_neg.R      |  37 +++
 .../parfor/parfor_brdatapartitioning_neg.dml    |  37 +++
 .../parfor/parfor_brdatapartitioning_pos.R      |  37 +++
 .../parfor/parfor_brdatapartitioning_pos.dml    |  37 +++
 26 files changed, 851 insertions(+), 228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java 
b/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java
index 5ca0cbc..784df63 100644
--- a/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java
+++ b/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java
@@ -30,7 +30,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.parser.Expression.BinaryOp;
 import org.apache.sysml.parser.Expression.BuiltinFunctionOp;
 import org.apache.sysml.parser.Expression.DataType;
@@ -41,6 +42,7 @@ import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PExecMode;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.POptMode;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PResultMerge;
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PTaskPartitioner;
+import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
@@ -403,43 +405,27 @@ public class ParForStatementBlock extends 
ForStatementBlock
         * @param var variables
         * @return partition format
         */
-       public PDataPartitionFormat determineDataPartitionFormat(String var) 
+       public PartitionFormat determineDataPartitionFormat(String var) 
        {
-               PDataPartitionFormat dpf = null;
-               List<PDataPartitionFormat> dpfc = new 
LinkedList<PDataPartitionFormat>();
+               PartitionFormat dpf = null;
+               List<PartitionFormat> dpfc = new LinkedList<PartitionFormat>();
                
                try 
                {
                        //determine partitioning candidates
-                       ParForStatement pfs = (ParForStatement) 
_statements.get(0);
-                       rDeterminePartitioningCandidates(var, pfs.getBody(), 
dpfc);
+                       ParForStatement dpfs = (ParForStatement) 
_statements.get(0);
+                       rDeterminePartitioningCandidates(var, dpfs.getBody(), 
dpfc);
                        
                        //determine final solution              
-                       for( PDataPartitionFormat tmp : dpfc )
-                       {
-                               //System.out.println(var+": "+tmp);
-                               if( dpf != null && dpf!=tmp ) //if no consensus
-                                       dpf = PDataPartitionFormat.NONE;        
-                               else
-                                       dpf = tmp;
-                                       
-                               /* TODO block partitioning
-                               if( dpf == null || dpf==tmp ) //consensus
-                                       dpf = tmp;
-                               else if(   
dpf==PDataPartitionFormat.BLOCK_WISE_M_N //subsumption 
-                                               || 
tmp==PDataPartitionFormat.BLOCK_WISE_M_N )
-                                       dpf = 
PDataPartitionFormat.BLOCK_WISE_M_N;
-                               else //no consensus
-                                       dpf = PDataPartitionFormat.NONE;
-                               */                      
-                       }
+                       for( PartitionFormat tmp : dpfc )
+                               dpf = ( dpf!=null && !dpf.equals(tmp) ) ? //if 
no consensus
+                                       PartitionFormat.NONE : tmp;
                        if( dpf == null )
-                               dpf = PDataPartitionFormat.NONE;
+                               dpf = PartitionFormat.NONE;
                }
-               catch (LanguageException e) 
-               {
+               catch (LanguageException e) {
                        LOG.trace( "Unable to determine partitioning 
candidates.", e );
-                       dpf = PDataPartitionFormat.NONE;
+                       dpf = PartitionFormat.NONE;
                }
                
                return dpf;
@@ -520,7 +506,7 @@ public class ParForStatementBlock extends ForStatementBlock
         * @param C list of partition formats
         * @throws LanguageException if LanguageException occurs
         */
-       private void rDeterminePartitioningCandidates(String var, 
ArrayList<StatementBlock> asb, List<PDataPartitionFormat> C) 
+       private void rDeterminePartitioningCandidates(String var, 
ArrayList<StatementBlock> asb, List<PartitionFormat> C) 
                throws LanguageException 
        {
                for(StatementBlock sb : asb ) // foreach statementblock in 
parforbody
@@ -570,61 +556,77 @@ public class ParForStatementBlock extends 
ForStatementBlock
                        }
        }
 
-       private void rDeterminePartitioningCandidates(String var, 
List<DataIdentifier> datsRead, List<PDataPartitionFormat> C)
+       private void rDeterminePartitioningCandidates(String var, 
List<DataIdentifier> datsRead, List<PartitionFormat> C)
        {
-               if( datsRead != null )
-                       for(DataIdentifier read : datsRead)
-                       { 
-                               String readStr = read.getName();                
                                        
-                               if( var.equals( readStr ) ) 
-                               {
-                                       if( read instanceof IndexedIdentifier )
-                                       {
-                                               IndexedIdentifier idat = 
(IndexedIdentifier) read;
-                                               C.add( 
determineAccessPattern(idat) );
-                                       }
-                                       else if( read instanceof DataIdentifier 
)
-                                       {
-                                               C.add( 
PDataPartitionFormat.NONE );
-                                       }
-                               }
+               if( datsRead == null )
+                       return;
+               
+               for(DataIdentifier read : datsRead)
+                       if( var.equals( read.getName() ) ) {
+                               if( read instanceof IndexedIdentifier )
+                                       C.add( 
determineAccessPattern((IndexedIdentifier) read) );
+                               else if( read instanceof DataIdentifier )
+                                       C.add( PartitionFormat.NONE );
                        }
        }
        
-       private PDataPartitionFormat determineAccessPattern( IndexedIdentifier 
dat )
+       private PartitionFormat determineAccessPattern( IndexedIdentifier dat )
        {
-               PDataPartitionFormat dpf = null;
+               boolean isSpark = OptimizerUtils.isSparkExecutionMode();
+               int blksz = ConfigurationManager.getBlocksize();
+               PartitionFormat dpf = null;
                
                //1) get all bounds expressions for index access
                Expression rowL = dat.getRowLowerBound();
                Expression rowU = dat.getRowUpperBound();
                Expression colL = dat.getColLowerBound();
                Expression colU = dat.getColUpperBound();
-               
-               //2) decided on access pattern          
-               //COLUMN_WISE iff row expr is colon (all rows) 
-               //   and access to single column
-               if( rowL == null && rowU == null && 
-                       colL!=null && colU != null && colL.equals(colU) )
-               {
-                       dpf = PDataPartitionFormat.COLUMN_WISE;
-               }               
-               //ROW_WISE iff col expr is colon (all columns) 
-               //   and access to single row
-               else if( colL == null && colU == null &&
-                               rowL!=null && rowU != null && rowL.equals(rowU) 
)
-               {
-                       dpf = PDataPartitionFormat.ROW_WISE;
+               boolean allRows = (rowL == null && rowU == null);
+               boolean allCols = (colL == null && colU == null);
+               
+               try {
+                       //2) decided on access pattern          
+                       //COLUMN_WISE if all rows and access to single column
+                       if( allRows && colL!=null && colL.equals(colU) ) {
+                               dpf = PartitionFormat.COLUMN_WISE;
+                       }               
+                       //ROW_WISE if all cols and access to single row
+                       else if( allCols && rowL!=null && rowL.equals(rowU) ) {
+                               dpf = PartitionFormat.ROW_WISE;
+                       }
+                       //COLUMN_BLOCK_WISE
+                       else if( isSpark && allRows && colL != colU ) {
+                               LinearFunction l1 = getLinearFunction(colL, 
true);
+                               LinearFunction l2 = getLinearFunction(colU, 
true);
+                               dpf = !isAlignedBlocking(l1, l2, blksz) ? 
PartitionFormat.NONE :
+                                       new 
PartitionFormat(PDataPartitionFormat.COLUMN_BLOCK_WISE_N, (int)l1._b[0]);
+                       }
+                       //ROW_BLOCK_WISE
+                       else if( isSpark && allCols && rowL != rowU ) {
+                               LinearFunction l1 = getLinearFunction(rowL, 
true);
+                               LinearFunction l2 = getLinearFunction(rowU, 
true);
+                               dpf = !isAlignedBlocking(l1, l2, blksz) ?  
PartitionFormat.NONE :
+                                       new 
PartitionFormat(PDataPartitionFormat.ROW_BLOCK_WISE_N, (int)l1._b[0]);
+                       }
+                       //NONE otherwise (conservative)
+                       else
+                               dpf = PartitionFormat.NONE;
+               }
+               catch(Exception ex) {
+                       throw new RuntimeException(ex);
                }
-               //NONE otherwise (conservative)
-               else
-                       dpf = PDataPartitionFormat.NONE;
-               
-               //TODO block partitioning
                
                return dpf;
        }
        
+       private static boolean isAlignedBlocking(LinearFunction l1, 
LinearFunction l2, int blksz) {
+               return (l1!=null && l2!=null && l1.equalSlope(l2) //same slope
+                       && l1._b.length==1 && l1._b[0]<=blksz         //single 
index and block
+                       && (l2._a - l1._a + 1 == l1._b[0])            
//intercept difference is slope
+                       && (blksz/l1._b[0])*l1._b[0] == blksz         //aligned 
slope
+                       && l2.eval(1L) == l1._b[0] );                 //aligned 
intercept
+       }
+       
        private void rConsolidateResultVars(ArrayList<StatementBlock> asb, 
ArrayList<String> vars) 
                throws LanguageException 
        {
@@ -1718,6 +1720,28 @@ public class ParForStatementBlock extends 
ForStatementBlock
                return out;
        }
        
+       private LinearFunction getLinearFunction(Expression expr, boolean 
ignoreMinWithConstant) 
+               throws LanguageException {
+               if( expr instanceof IntIdentifier )
+                       return new 
LinearFunction(((IntIdentifier)expr).getValue(), 0, null);
+               else if( expr instanceof BinaryExpression )
+                       return rParseBinaryExpression((BinaryExpression)expr);
+               else if( expr instanceof BuiltinFunctionExpression && 
ignoreMinWithConstant ) {
+                       //note: builtin function expression is also a data 
identifier and hence order before
+                       BuiltinFunctionExpression bexpr = 
(BuiltinFunctionExpression) expr;
+                       if( bexpr.getOpCode()==BuiltinFunctionOp.MIN ) {
+                               if( bexpr.getFirstExpr() instanceof 
BinaryExpression )
+                                       return 
rParseBinaryExpression((BinaryExpression)bexpr.getFirstExpr());
+                               else if( bexpr.getSecondExpr() instanceof 
BinaryExpression )
+                                       return 
rParseBinaryExpression((BinaryExpression)bexpr.getSecondExpr());
+                       }
+               }
+               else if( expr instanceof DataIdentifier )
+                       return new LinearFunction(0, 1, 
((DataIdentifier)expr)._name); //never use public members
+               
+               return null;
+       }
+       
        /**
         * Creates a functionID for a given data identifier (mainly used for 
caching purposes),
         * where data identifiers with equal name and matrix subscripts results 
in equal
@@ -1931,16 +1955,25 @@ public class ParForStatementBlock extends 
ForStatementBlock
                }
                else if( be.getOpCode() == BinaryOp.MULT )
                {
-                       //NOTE: no recursion for MULT expressions
+                       //NOTE: only recursion for MULT expressions, where
+                       //one side is a constant 
                        
                        //atomic case
                        Long cvalL = parseLongConstant(l);
                        Long cvalR = parseLongConstant(r);
                        
-                       if( cvalL != null )
+                       if( cvalL != null && r instanceof DataIdentifier )
                                ret = new LinearFunction(0, 
cvalL,((DataIdentifier)r)._name);   
-                       else if( cvalR != null )
+                       else if( cvalR != null && l instanceof DataIdentifier )
                                ret = new LinearFunction(0, 
cvalR,((DataIdentifier)l)._name);
+                       else if( cvalL != null && r instanceof BinaryExpression 
) {
+                               LinearFunction ltmp = 
rParseBinaryExpression((BinaryExpression)r);
+                               return ltmp.scale(cvalL);
+                       }
+                       else if( cvalR != null && l instanceof BinaryExpression 
) {
+                               LinearFunction ltmp = 
rParseBinaryExpression((BinaryExpression)l);
+                               return ltmp.scale(cvalR);
+                       }
                        else
                                return null; //let dependency analysis fail
                }
@@ -2014,8 +2047,7 @@ public class ParForStatementBlock extends 
ForStatementBlock
                        _vars[0] = name;
                }
                
-               public void addConstant(long value)
-               {
+               public void addConstant(long value) {
                        _a += value;    
                }
 
@@ -2048,19 +2080,26 @@ public class ParForStatementBlock extends 
ForStatementBlock
                        _vars = tmpvars;                        
                }
                
-               public void scale( long scale )
-               {                               
-                       _a *= scale; //-1 because indexing starts at 1 
-                       
+               public LinearFunction scale( long scale ) {
+                       _a *= scale; 
                        for( int i=0; i<_b.length; i++ )
                                if( _b[i] != 0 )
                                        _b[i] *= scale;
+                       return this;
                }
                
-               public void normalize(int index, long lower, long increment) 
-               {
+               public LinearFunction normalize(int index, long lower, long 
increment) {
                        _a -= (_b[index] * lower);
                        _b[index] *= increment;
+                       return this;
+               }
+               
+               public long eval(Long... x) {
+                       long ret = _a;
+                       for( int i=0; i<_b.length; i++ )
+                               if( _b[i] != 0 )
+                                       ret += _b[i] *= x[i];
+                       return ret;
                }
                
                @Override
@@ -2087,48 +2126,36 @@ public class ParForStatementBlock extends 
ForStatementBlock
                }
                
                @Override
-               public boolean equals( Object o2 )
-               {
+               public boolean equals( Object o2 ) {
                        if( o2 == null || !(o2 instanceof LinearFunction)  )
                                return false;
                                
                        LinearFunction f2 = (LinearFunction)o2;
-                       boolean ret = true;
-                       ret &= ( _a == f2._a );
-                       ret &= ( _b.length == f2._b.length );
-                       
-                       if( ret )
-                       {
-                               for( int i=0; i<_b.length; i++ )
-                               {
-                                       ret &= (_b[i] == f2._b[i] );
-                                       ret &= (_vars[i].equals(f2._vars[i]) 
-                                                       
||(_vars[i].startsWith(INTERAL_FN_INDEX_ROW) && 
f2._vars[i].startsWith(INTERAL_FN_INDEX_ROW)) 
-                                                       
||(_vars[i].startsWith(INTERAL_FN_INDEX_COL) && 
f2._vars[i].startsWith(INTERAL_FN_INDEX_COL)) )  ;
-                               }
+                       return ( _a == f2._a )
+                               && equalSlope(f2);
+               }
+
+               public boolean equalSlope(LinearFunction f2) {
+                       boolean ret = ( _b.length == f2._b.length );
+                       for( int i=0; i<_b.length && ret; i++ ) {
+                               ret &= (_b[i] == f2._b[i] );
+                               ret &= (_vars[i].equals(f2._vars[i])
+                                       
||(_vars[i].startsWith(INTERAL_FN_INDEX_ROW) && 
f2._vars[i].startsWith(INTERAL_FN_INDEX_ROW))
+                                       
||(_vars[i].startsWith(INTERAL_FN_INDEX_COL) && 
f2._vars[i].startsWith(INTERAL_FN_INDEX_COL)) )  ;
                        }
-                       
                        return ret;
                }
-
+               
                @Override
-               public int hashCode()
-               {
-                       //use identity hash code
-                       return super.hashCode();
+               public int hashCode() {
+                       return super.hashCode(); //identity
                }
                
-               public boolean hasNonIndexVariables() 
-               {
-                       boolean ret = false;
+               public boolean hasNonIndexVariables() {
                        for( String var : _vars )
                                if( var!=null && 
!_bounds._lower.containsKey(var) )
-                               {
-                                       ret = true;
-                                       break;
-                               }
-                       
-                       return ret;
+                                       return true;
+                       return false;
                }
                
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index 0e3574f..a6b8e0e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -22,6 +22,7 @@ package org.apache.sysml.runtime.controlprogram;
 import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -179,6 +180,42 @@ public class ParForProgramBlock extends ForProgramBlock
                }
        }
        
+       /**
+        * Convenience class to package PDataPartitionFormat and its parameters.
+        */
+       public static class PartitionFormat implements Serializable {
+               private static final long serialVersionUID = 
4729309847778707801L;
+               public static final PartitionFormat NONE = new 
PartitionFormat(PDataPartitionFormat.NONE, -1);
+               public static final PartitionFormat ROW_WISE = new 
PartitionFormat(PDataPartitionFormat.ROW_WISE, -1);
+               public static final PartitionFormat COLUMN_WISE = new 
PartitionFormat(PDataPartitionFormat.COLUMN_WISE, -1);
+               
+               public final PDataPartitionFormat _dpf;
+               public final int _N;
+               public PartitionFormat(PDataPartitionFormat dpf, int N) {
+                       _dpf = dpf;
+                       _N = N;
+               }
+               @Override
+               public boolean equals(Object o) {
+                       return (o instanceof PartitionFormat)
+                               && _dpf == ((PartitionFormat)o)._dpf
+                               && _N == ((PartitionFormat)o)._N;
+               }
+               @Override
+               public String toString() {
+                       return _dpf.name()+","+_N;      
+               }
+               public static PartitionFormat valueOf(String value) {
+                       String[] parts = value.split(",");
+                       return new PartitionFormat(PDataPartitionFormat
+                               .parsePDataPartitionFormat(parts[0]), 
Integer.parseInt(parts[1]));
+               }
+               public boolean isBlockwise() {
+                       return _dpf == PDataPartitionFormat.COLUMN_BLOCK_WISE_N 
+                               || _dpf == 
PDataPartitionFormat.ROW_BLOCK_WISE_N;
+               }
+       }
+       
        public enum PDataPartitioner {
                NONE,       // no data partitioning
                LOCAL,      // local file based partition split on master node
@@ -893,11 +930,11 @@ public class ParForProgramBlock extends ForProgramBlock
                // Step 0) check and compile to CP (if forced remote parfor)
                boolean flagForced = checkMRAndRecompileToCP(0);
                
-               // Step 1) prepare partitioned input matrix (needs to happen 
before serializing the progam)
+               // Step 1) prepare partitioned input matrix (needs to happen 
before serializing the program)
                ParForStatementBlock sb = (ParForStatementBlock) 
getStatementBlock();
                MatrixObject inputMatrix = 
ec.getMatrixObject(_colocatedDPMatrix );
-               PDataPartitionFormat inputDPF = 
sb.determineDataPartitionFormat( _colocatedDPMatrix );
-               inputMatrix.setPartitioned(inputDPF, 1); //mark matrix var as 
partitioned (for reducers) 
+               PartitionFormat inputDPF = sb.determineDataPartitionFormat( 
_colocatedDPMatrix );
+               inputMatrix.setPartitioned(inputDPF._dpf, inputDPF._N); //mark 
matrix var as partitioned  
                
                // Step 2) init parallel workers (serialize PBs)
                // NOTES: each mapper changes filenames with regard to his ID 
as we submit a single job,
@@ -921,8 +958,8 @@ public class ParForProgramBlock extends ForProgramBlock
                exportMatricesToHDFS(ec);
                                
                // Step 4) submit MR job (wait for finished work)
-               OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && 
inputDPF==PDataPartitionFormat.COLUMN_WISE)||
-                                             (inputMatrix.getSparsity()<0.001 
&& inputDPF==PDataPartitionFormat.ROW_WISE))? 
+               OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && 
inputDPF==PartitionFormat.COLUMN_WISE)||
+                                             (inputMatrix.getSparsity()<0.001 
&& inputDPF==PartitionFormat.ROW_WISE))? 
                                             OutputInfo.BinaryCellOutputInfo : 
OutputInfo.BinaryBlockOutputInfo;
                RemoteParForJobReturn ret = RemoteDPParForMR.runJob(_ID, 
itervar.getName(), _colocatedDPMatrix, program, resultFile, 
                                inputMatrix, inputDPF, inputOI, _tSparseCol, 
_enableCPCaching, _numThreads, _replicationDP, MAX_RETRYS_ON_ERROR );
@@ -1024,8 +1061,8 @@ public class ParForProgramBlock extends ForProgramBlock
                // Step 1) prepare partitioned input matrix (needs to happen 
before serializing the progam)
                ParForStatementBlock sb = (ParForStatementBlock) 
getStatementBlock();
                MatrixObject inputMatrix = 
ec.getMatrixObject(_colocatedDPMatrix );
-               PDataPartitionFormat inputDPF = 
sb.determineDataPartitionFormat( _colocatedDPMatrix );
-               inputMatrix.setPartitioned(inputDPF, 1); //mark matrix var as 
partitioned (for reducers) 
+               PartitionFormat inputDPF = sb.determineDataPartitionFormat( 
_colocatedDPMatrix );
+               inputMatrix.setPartitioned(inputDPF._dpf, inputDPF._N); //mark 
matrix var as partitioned  
                
                // Step 2) init parallel workers (serialize PBs)
                // NOTES: each mapper changes filenames with regard to his ID 
as we submit a single job,
@@ -1084,7 +1121,8 @@ public class ParForProgramBlock extends ForProgramBlock
        private void handleDataPartitioning( ExecutionContext ec ) 
                throws DMLRuntimeException
        {
-               if( _dataPartitioner != PDataPartitioner.NONE )
+               PDataPartitioner dataPartitioner = _dataPartitioner;
+               if( dataPartitioner != PDataPartitioner.NONE )
                {                       
                        ParForStatementBlock sb = (ParForStatementBlock) 
getStatementBlock();
                        if( sb == null )
@@ -1100,19 +1138,24 @@ public class ParForProgramBlock extends ForProgramBlock
                                {
                                        MatrixObject moVar = (MatrixObject) 
dat; //unpartitioned input
                                        
-                                       PDataPartitionFormat dpf = 
sb.determineDataPartitionFormat( var );
-                                       //dpf = (_optMode != POptMode.NONE) ? 
OptimizerRuleBased.decideBlockWisePartitioning(moVar, dpf) : dpf;
+                                       PartitionFormat dpf = 
sb.determineDataPartitionFormat( var );
                                        LOG.trace("PARFOR ID = "+_ID+", 
Partitioning read-only input variable "+var+" (format="+dpf+", 
mode="+_dataPartitioner+")");
                                        
-                                       if( dpf != PDataPartitionFormat.NONE )
+                                       if( dpf != PartitionFormat.NONE )
                                        {
+                                               if( dataPartitioner != 
PDataPartitioner.REMOTE_SPARK && dpf.isBlockwise() ) {
+                                                       LOG.warn("PARFOR ID = 
"+_ID+", Switching data partitioner from " + dataPartitioner + 
+                                                                       " to " 
+ PDataPartitioner.REMOTE_SPARK.name()+" for blockwise-n partitioning.");
+                                                       dataPartitioner = 
PDataPartitioner.REMOTE_SPARK;
+                                               }
+                                               
                                                Timing ltime = new Timing(true);
                                                
                                                //input data partitioning 
(reuse if possible)
                                                Data dpdatNew = 
_variablesDPReuse.get(var);
                                                if( dpdatNew == null ) //no 
reuse opportunity
                                                {
-                                                       DataPartitioner dp = 
createDataPartitioner( dpf, _dataPartitioner, ec );
+                                                       DataPartitioner dp = 
createDataPartitioner( dpf, dataPartitioner, ec );
                                                        //disable binary cell 
for sparse if consumed by MR jobs
                                                        if(    
!OptimizerRuleBased.allowsBinaryCellPartitions(moVar, dpf )
                                                                || 
OptimizerUtils.isSparkExecutionMode() ) //TODO support for binarycell
@@ -1422,7 +1465,7 @@ public class ParForProgramBlock extends ForProgramBlock
         * @return data partitioner
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       private DataPartitioner createDataPartitioner(PDataPartitionFormat dpf, 
PDataPartitioner dataPartitioner, ExecutionContext ec) 
+       private DataPartitioner createDataPartitioner(PartitionFormat dpf, 
PDataPartitioner dataPartitioner, ExecutionContext ec) 
                throws DMLRuntimeException 
        {
                DataPartitioner dp = null;
@@ -1439,19 +1482,17 @@ public class ParForProgramBlock extends ForProgramBlock
                switch( dataPartitioner )
                {
                        case LOCAL:
-                               dp = new DataPartitionerLocal(dpf, -1, 
_numThreads);
+                               dp = new DataPartitionerLocal(dpf, _numThreads);
                                break;
                        case REMOTE_MR:
-                               dp = new DataPartitionerRemoteMR( dpf, -1, _ID, 
numRed,
-                                                                         
_replicationDP, 
-                                                                         
MAX_RETRYS_ON_ERROR, 
-                                                                         
ALLOW_REUSE_MR_JVMS, false );
+                               dp = new DataPartitionerRemoteMR( dpf, _ID, 
numRed,
+                                       _replicationDP, MAX_RETRYS_ON_ERROR, 
ALLOW_REUSE_MR_JVMS, false );
                                break;
                        case REMOTE_SPARK:
-                               dp = new DataPartitionerRemoteSpark( dpf, -1, 
ec, numRed, false );
+                               dp = new DataPartitionerRemoteSpark( dpf, ec, 
numRed, false );
                                break;  
                        default:
-                               throw new DMLRuntimeException("Undefined data 
partitioner: '" +dataPartitioner.toString()+"'.");
+                               throw new DMLRuntimeException("Unknown data 
partitioner: '" +dataPartitioner.name()+"'.");
                }
                
                return dp;
@@ -1983,7 +2024,6 @@ public class ParForProgramBlock extends ForProgramBlock
                        }
                }
        }
-       
 
        public String printBlockErrorLocation(){
                return "ERROR: Runtime error in parfor program block generated 
from parfor statement block between lines " + _beginLine + " and " + _endLine + 
" -- ";

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
index 48855ee..dc3cfd1 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
@@ -293,6 +293,10 @@ public class MatrixObject extends 
CacheableData<MatrixBlock>
                                                rows = brlen;
                                                cols = mc.getCols();
                                                break;
+                                       case ROW_BLOCK_WISE_N: 
+                                               rows = _partitionSize;
+                                               cols = mc.getCols();
+                                               break;
                                        case COLUMN_WISE:
                                                rows = mc.getRows();
                                                cols = 1;
@@ -301,6 +305,10 @@ public class MatrixObject extends 
CacheableData<MatrixBlock>
                                                rows = mc.getRows();
                                                cols = bclen;
                                                break;
+                                       case COLUMN_BLOCK_WISE_N: 
+                                               rows = mc.getRows();
+                                               cols = _partitionSize;
+                                               break;  
                                        default:
                                                throw new 
CacheException("Unsupported partition format: "+_partitionFormat);
                                }
@@ -370,6 +378,10 @@ public class MatrixObject extends 
CacheableData<MatrixBlock>
                                sb.append(Lop.FILE_SEPARATOR);
                                sb.append((pred.rowStart-1)/brlen+1);
                                break;
+                       case ROW_BLOCK_WISE_N:
+                               sb.append(Lop.FILE_SEPARATOR);
+                               sb.append((pred.rowStart-1)/_partitionSize+1);
+                               break;
                        case COLUMN_WISE:
                                sb.append(Lop.FILE_SEPARATOR);
                                sb.append(pred.colStart);
@@ -378,6 +390,10 @@ public class MatrixObject extends 
CacheableData<MatrixBlock>
                                sb.append(Lop.FILE_SEPARATOR);
                                sb.append((pred.colStart-1)/bclen+1);
                                break;
+                       case COLUMN_BLOCK_WISE_N:
+                               sb.append(Lop.FILE_SEPARATOR);
+                               sb.append((pred.colStart-1)/_partitionSize+1);
+                               break;  
                        default:
                                throw new CacheException ("MatrixObject not 
available to indexed read.");
                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java
index ab76b55..d836e18 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java
@@ -57,8 +57,7 @@ public abstract class DataPartitioner
        protected int _n = -1; //blocksize if applicable
        protected boolean _allowBinarycell = true;
        
-       protected DataPartitioner( PDataPartitionFormat dpf, int n )
-       {
+       protected DataPartitioner( PDataPartitionFormat dpf, int n ) {
                _format = dpf;
                _n = n;
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
index d16d8a5..eb273b6 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.parfor.util.Cell;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
@@ -78,7 +79,6 @@ import org.apache.sysml.runtime.util.LocalFileUtils;
  */
 public class DataPartitionerLocal extends DataPartitioner
 {
-       
        private static final boolean PARALLEL = true; 
        
        private IDSequence _seq = null;
@@ -94,13 +94,12 @@ public class DataPartitionerLocal extends DataPartitioner
         * @param par -1 for serial otherwise number of threads, can be ignored 
by implementation
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public DataPartitionerLocal(PDataPartitionFormat dpf, int n, int par) 
+       public DataPartitionerLocal(PartitionFormat dpf, int par) 
                throws DMLRuntimeException 
        {
-               super(dpf, n);
+               super(dpf._dpf, dpf._N);
                
-               //TODO
-               if( dpf == PDataPartitionFormat.ROW_BLOCK_WISE_N || dpf == 
PDataPartitionFormat.COLUMN_BLOCK_WISE_N  )
+               if( dpf.isBlockwise() )
                        throw new DMLRuntimeException("Data partitioning formt 
'"+dpf+"' not supported by DataPartitionerLocal" );
                
                _seq = new IDSequence();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
index f666feb..5514007 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
@@ -30,6 +30,7 @@ import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
@@ -47,23 +48,20 @@ import org.apache.sysml.yarn.DMLAppMasterUtils;
  */
 public class DataPartitionerRemoteMR extends DataPartitioner
 {      
-       
        private long _pfid = -1;
        private int  _numReducers = -1;
        private int  _replication = -1;
-       //private int  _max_retry = -1;
        private boolean _jvmReuse = false;
        private boolean _keepIndexes = false;
        
        
-       public DataPartitionerRemoteMR(PDataPartitionFormat dpf, int n, long 
pfid, int numReducers, int replication, int max_retry, boolean jvmReuse, 
boolean keepIndexes) 
+       public DataPartitionerRemoteMR(PartitionFormat dpf, long pfid, int 
numReducers, int replication, int max_retry, boolean jvmReuse, boolean 
keepIndexes) 
        {
-               super(dpf, n);
+               super(dpf._dpf, dpf._N);
                
                _pfid = pfid;
                _numReducers = numReducers;
                _replication = replication;
-               //_max_retry = max_retry;
                _jvmReuse = jvmReuse;
                _keepIndexes = keepIndexes;
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
index be758d2..1569709 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
@@ -23,7 +23,7 @@ import org.apache.spark.api.java.JavaPairRDD;
 
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.runtime.DMLRuntimeException;
-import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
@@ -47,9 +47,9 @@ public class DataPartitionerRemoteSpark extends 
DataPartitioner
        private ExecutionContext _ec = null;
        private long _numRed = -1;
        
-       public DataPartitionerRemoteSpark(PDataPartitionFormat dpf, int n, 
ExecutionContext ec, long numRed, boolean keepIndexes) 
+       public DataPartitionerRemoteSpark(PartitionFormat dpf, ExecutionContext 
ec, long numRed, boolean keepIndexes) 
        {
-               super(dpf, n);
+               super(dpf._dpf, dpf._N);
                
                _ec = ec;
                _numRed = numRed;
@@ -78,7 +78,7 @@ public class DataPartitionerRemoteSpark extends 
DataPartitioner
                        int numRed = (int)determineNumReducers(inRdd, mc, 
_numRed);
        
                        //run spark remote data partition job 
-                       DataPartitionerRemoteSparkMapper dpfun = new 
DataPartitionerRemoteSparkMapper(mc, ii, oi, _format);
+                       DataPartitionerRemoteSparkMapper dpfun = new 
DataPartitionerRemoteSparkMapper(mc, ii, oi, _format, _n);
                        DataPartitionerRemoteSparkReducer wfun = new 
DataPartitionerRemoteSparkReducer(fnameNew, oi);
                        inRdd.flatMapToPair(dpfun) //partition the input blocks
                             .groupByKey(numRed)   //group partition blocks     
                  

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java
index 9e690d5..0f76156 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java
@@ -50,28 +50,24 @@ public class DataPartitionerRemoteSparkMapper extends 
ParWorker implements PairF
        
        private static final long serialVersionUID = 332887624852010957L;
        
-       private long _rlen = -1;
-       private long _clen = -1;
-       private long _brlen = -1;
-       private long _bclen = -1;
-       private long _n = -1;
-       //private InputInfo _ii = null;
-       //private OutputInfo _oi = null;
-       private PDataPartitionFormat _dpf = null;
+       private final long _rlen;
+       private final long _clen;
+       private final long _brlen;
+       private final long _bclen;
+       private PDataPartitionFormat _dpf;
+       private final long _n;
        
-       public DataPartitionerRemoteSparkMapper(MatrixCharacteristics mc, 
InputInfo ii, OutputInfo oi, PDataPartitionFormat dpf) 
+       public DataPartitionerRemoteSparkMapper(MatrixCharacteristics mc, 
InputInfo ii, OutputInfo oi, PDataPartitionFormat dpf, int n) 
                throws DMLRuntimeException
        {
                _rlen = mc.getRows();
                _clen = mc.getCols();
                _brlen = mc.getRowsPerBlock();
                _bclen = mc.getColsPerBlock();
-               //_ii = ii;
-               //_oi = oi;
                _dpf = dpf;
+               _n = n;
        }
 
-
        @Override
        public Iterator<Tuple2<Long, Writable>> call(Tuple2<MatrixIndexes, 
MatrixBlock> arg0) 
                throws Exception 
@@ -113,10 +109,21 @@ public class DataPartitionerRemoteSparkMapper extends 
ParWorker implements PairF
                                break;
                        }
                        case ROW_BLOCK_WISE_N:{ 
-                               PairWritableBlock tmp = new PairWritableBlock();
-                               tmp.indexes = new 
MatrixIndexes(((row_offset%_n)/_brlen)+1, col_offset/_bclen+1);
-                               tmp.block = new MatrixBlock(value2);
-                               ret.add(new Tuple2<Long,Writable>(new 
Long(row_offset/_n+1),tmp));
+                               if( _n >= _brlen ) {
+                                       PairWritableBlock tmp = new 
PairWritableBlock();
+                                       tmp.indexes = new 
MatrixIndexes(((row_offset%_n)/_brlen)+1, col_offset/_bclen+1);
+                                       tmp.block = new MatrixBlock(value2);
+                                       ret.add(new Tuple2<Long,Writable>(new 
Long(row_offset/_n+1),tmp));
+                               }
+                               else {
+                                       for( int i=0; i<rows; i+=_n ) {
+                                               PairWritableBlock tmp = new 
PairWritableBlock();
+                                               tmp.indexes = new 
MatrixIndexes(1, col_offset/_bclen+1);
+                                               tmp.block = 
value2.sliceOperations(i, Math.min(i+(int)_n-1, value2.getNumRows()-1), 
+                                                               0, 
value2.getNumColumns()-1, new MatrixBlock());
+                                               ret.add(new 
Tuple2<Long,Writable>(new Long((row_offset+i)/_n+1),tmp));
+                                       }
+                               }
                                break;
                        }
                        case COLUMN_WISE:{
@@ -137,10 +144,21 @@ public class DataPartitionerRemoteSparkMapper extends 
ParWorker implements PairF
                                break;
                        }
                        case COLUMN_BLOCK_WISE_N: {
-                               PairWritableBlock tmp = new PairWritableBlock();
-                               tmp.indexes = new 
MatrixIndexes(row_offset/_brlen+1, ((col_offset%_n)/_bclen)+1);
-                               tmp.block = new MatrixBlock(value2);
-                               ret.add(new Tuple2<Long,Writable>(new 
Long(col_offset/_n+1),tmp));
+                               if( _n >= _bclen ) {
+                                       PairWritableBlock tmp = new 
PairWritableBlock();
+                                       tmp.indexes = new 
MatrixIndexes(row_offset/_brlen+1, ((col_offset%_n)/_bclen)+1);
+                                       tmp.block = new MatrixBlock(value2);
+                                       ret.add(new Tuple2<Long,Writable>(new 
Long(col_offset/_n+1),tmp));
+                               }
+                               else {
+                                       for( int i=0; i<cols; i+=_n ) {
+                                               PairWritableBlock tmp = new 
PairWritableBlock();
+                                               tmp.indexes = new 
MatrixIndexes(row_offset/_brlen+1, 1);
+                                               tmp.block = 
value2.sliceOperations(0, value2.getNumRows()-1, 
+                                                               i, 
Math.min(i+(int)_n-1, value2.getNumColumns()-1), new MatrixBlock());
+                                               ret.add(new 
Tuple2<Long,Writable>(new Long((col_offset+i)/_n+1),tmp));
+                                       }
+                               }
                                break;
                        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
index 00e72c4..17d5936 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
@@ -60,6 +60,7 @@ import org.apache.sysml.runtime.controlprogram.ProgramBlock;
 import org.apache.sysml.runtime.controlprogram.WhileProgramBlock;
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PExecMode;
+import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
@@ -883,7 +884,8 @@ public class ProgramConverter
                                MatrixFormatMetaData md = 
(MatrixFormatMetaData) dat.getMetaData();
                                MatrixCharacteristics mc = 
md.getMatrixCharacteristics();
                                value = mo.getFileName();
-                               PDataPartitionFormat partFormat = 
(mo.getPartitionFormat()!=null) ? mo.getPartitionFormat() : 
PDataPartitionFormat.NONE;
+                               PartitionFormat partFormat = 
(mo.getPartitionFormat()!=null) ? new PartitionFormat(
+                                               
mo.getPartitionFormat(),mo.getPartitionSize()) : PartitionFormat.NONE;
                                matrixMetaData = new String[9];
                                matrixMetaData[0] = String.valueOf( 
mc.getRows() );
                                matrixMetaData[1] = String.valueOf( 
mc.getCols() );
@@ -1819,14 +1821,14 @@ public class ProgramConverter
                                long nnz = Long.parseLong( st.nextToken() );
                                InputInfo iin = InputInfo.stringToInputInfo( 
st.nextToken() );
                                OutputInfo oin = OutputInfo.stringToOutputInfo( 
st.nextToken() );               
-                               PDataPartitionFormat partFormat = 
PDataPartitionFormat.valueOf( st.nextToken() );
+                               PartitionFormat partFormat = 
PartitionFormat.valueOf( st.nextToken() );
                                UpdateType inplace = UpdateType.valueOf( 
st.nextToken() );
                                MatrixCharacteristics mc = new 
MatrixCharacteristics(rows, cols, brows, bcols, nnz); 
                                MatrixFormatMetaData md = new 
MatrixFormatMetaData( mc, oin, iin );
                                mo.setMetaData( md );
                                mo.setVarName( name );
-                               if( partFormat!=PDataPartitionFormat.NONE )
-                                       mo.setPartitioned( partFormat, -1 ); 
//TODO once we support BLOCKWISE_N we should support it here as well
+                               if( partFormat._dpf != 
PDataPartitionFormat.NONE )
+                                       mo.setPartitioned( partFormat._dpf, 
partFormat._N );
                                mo.setUpdateType(inplace);
                                dat = mo;
                                break;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
index 293150e..d01368e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
@@ -42,7 +42,7 @@ import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
-import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
 import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
@@ -68,12 +68,11 @@ import org.apache.sysml.yarn.DMLAppMasterUtils;
  */
 public class RemoteDPParForMR
 {
-       
        protected static final Log LOG = 
LogFactory.getLog(RemoteDPParForMR.class.getName());
 
-       public static RemoteParForJobReturn runJob(long pfid, String itervar, 
String matrixvar, String program, String resultFile, MatrixObject input, 
-                                                          PDataPartitionFormat 
dpf, OutputInfo oi, boolean tSparseCol, //config params
-                                                          boolean 
enableCPCaching, int numReducers, int replication, int max_retry)  //opt params
+       public static RemoteParForJobReturn runJob(long pfid, String itervar, 
String matrixvar, String program, 
+                       String resultFile, MatrixObject input, PartitionFormat 
dpf, OutputInfo oi, boolean tSparseCol, //config params
+                       boolean enableCPCaching, int numReducers, int 
replication, int max_retry)  //opt params
                throws DMLRuntimeException
        {
                RemoteParForJobReturn ret = null;
@@ -104,7 +103,8 @@ public class RemoteDPParForMR
                        long clen = input.getNumColumns();
                        int brlen = (int) input.getNumRowsPerBlock();
                        int bclen = (int) input.getNumColumnsPerBlock();
-                       MRJobConfiguration.setPartitioningInfo(job, rlen, clen, 
brlen, bclen, InputInfo.BinaryBlockInputInfo, oi, dpf, 1, input.getFileName(), 
itervar, matrixvar, tSparseCol);
+                       MRJobConfiguration.setPartitioningInfo(job, rlen, clen, 
brlen, bclen, InputInfo.BinaryBlockInputInfo, 
+                                       oi, dpf._dpf, dpf._N, 
input.getFileName(), itervar, matrixvar, tSparseCol);
                        
job.setInputFormat(InputInfo.BinaryBlockInputInfo.inputFormatClass);
                        FileInputFormat.setInputPaths(job, path);
                        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
index b801402..a612a3e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
@@ -42,6 +42,7 @@ import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
@@ -70,7 +71,7 @@ public class RemoteDPParForSpark
        protected static final Log LOG = 
LogFactory.getLog(RemoteDPParForSpark.class.getName());
 
        public static RemoteParForJobReturn runJob(long pfid, String itervar, 
String matrixvar, String program, HashMap<String, byte[]> clsMap,
-                       String resultFile, MatrixObject input, ExecutionContext 
ec, PDataPartitionFormat dpf, OutputInfo oi, 
+                       String resultFile, MatrixObject input, ExecutionContext 
ec, PartitionFormat dpf, OutputInfo oi, 
                        boolean tSparseCol, boolean enableCPCaching, int 
numReducers ) 
                throws DMLRuntimeException
        {
@@ -91,7 +92,7 @@ public class RemoteDPParForSpark
 
                //compute number of reducers (to avoid OOMs and reduce memory 
pressure)
                int numParts = SparkUtils.getNumPreferredPartitions(mc, in);
-               int numParts2 = 
(int)((dpf==PDataPartitionFormat.ROW_BLOCK_WISE) ? mc.getRows() : 
mc.getCols()); 
+               int numParts2 = 
(int)((dpf._dpf==PDataPartitionFormat.ROW_BLOCK_WISE) ? mc.getRows() : 
mc.getCols()); 
                int numReducers2 = Math.max(numReducers, Math.min(numParts, 
numParts2));
                
                //core parfor datapartition-execute (w/ or w/o shuffle, 
depending on data characteristics)
@@ -123,7 +124,7 @@ public class RemoteDPParForSpark
        
        @SuppressWarnings("unchecked")
        private static JavaPairRDD<Long, Writable> 
getPartitionedInput(SparkExecutionContext sec, 
-                       String matrixvar, OutputInfo oi, PDataPartitionFormat 
dpf) 
+                       String matrixvar, OutputInfo oi, PartitionFormat dpf) 
                throws DMLRuntimeException 
        {
                InputInfo ii = InputInfo.BinaryBlockInputInfo;
@@ -153,7 +154,7 @@ public class RemoteDPParForSpark
                {
                        //get input rdd and data partitioning 
                        JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
sec.getBinaryBlockRDDHandleForVariable(matrixvar);
-                       DataPartitionerRemoteSparkMapper dpfun = new 
DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf);
+                       DataPartitionerRemoteSparkMapper dpfun = new 
DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf._dpf, dpf._N);
                        return in.flatMapToPair(dpfun);
                }
                //default binary block input rdd with grouping
@@ -167,22 +168,22 @@ public class RemoteDPParForSpark
                                                
mo.getRDDHandle().getLineageChilds().get(0)).getRDD();
                        
                        //data partitioning of input rdd 
-                       DataPartitionerRemoteSparkMapper dpfun = new 
DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf);
+                       DataPartitionerRemoteSparkMapper dpfun = new 
DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf._dpf, dpf._N);
                        return in.flatMapToPair(dpfun);
                }
        } 
        
        //determines if given input matrix requires grouping of partial 
partition slices
-       private static boolean requiresGrouping(PDataPartitionFormat dpf, 
MatrixObject mo) {
+       private static boolean requiresGrouping(PartitionFormat dpf, 
MatrixObject mo) {
                MatrixCharacteristics mc = mo.getMatrixCharacteristics();
-               return ((dpf == PDataPartitionFormat.ROW_WISE && 
mc.getNumColBlocks() > 1)
-                               || (dpf == PDataPartitionFormat.COLUMN_WISE && 
mc.getNumRowBlocks() > 1))
+               return ((dpf == PartitionFormat.ROW_WISE && 
mc.getNumColBlocks() > 1)
+                               || (dpf == PartitionFormat.COLUMN_WISE && 
mc.getNumRowBlocks() > 1))
                        && !hasInputDataSet(dpf, mo);
        }
        
        //determines if given input matrix wraps input data set applicable to 
direct processing
-       private static boolean hasInputDataSet(PDataPartitionFormat dpf, 
MatrixObject mo) {
-               return (dpf == PDataPartitionFormat.ROW_WISE 
+       private static boolean hasInputDataSet(PartitionFormat dpf, 
MatrixObject mo) {
+               return (dpf == PartitionFormat.ROW_WISE 
                        && mo.getRDDHandle().isCheckpointRDD()
                        && mo.getRDDHandle().getLineageChilds().size()==1
                        && 
mo.getRDDHandle().getLineageChilds().get(0).getLineageChilds().size()==1

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index 400fbd5..0d368e8 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -33,6 +33,7 @@ import org.apache.spark.util.LongAccumulator;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.codegen.CodegenUtils;
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType;
@@ -70,7 +71,7 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
        private final LongAccumulator _aIters;
        
        public RemoteDPParForSparkWorker(String program, HashMap<String, 
byte[]> clsMap, String inputVar, String iterVar, 
-                       boolean cpCaching, MatrixCharacteristics mc, boolean 
tSparseCol, PDataPartitionFormat dpf, OutputInfo oinfo, 
+                       boolean cpCaching, MatrixCharacteristics mc, boolean 
tSparseCol, PartitionFormat dpf, OutputInfo oinfo, 
                        LongAccumulator atasks, LongAccumulator aiters) 
                throws DMLRuntimeException
        {
@@ -86,12 +87,12 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
                _aIters = aiters;
                
                //setup matrix block partition meta data
-               _rlen = (dpf != PDataPartitionFormat.ROW_WISE) ? 
(int)mc.getRows() : 1;
-               _clen = (dpf != PDataPartitionFormat.COLUMN_WISE) ? 
(int)mc.getCols() : 1;
+               _rlen = (dpf != PartitionFormat.ROW_WISE) ? (int)mc.getRows() : 
1;
+               _clen = (dpf != PartitionFormat.COLUMN_WISE) ? 
(int)mc.getCols() : 1;
                _brlen = mc.getRowsPerBlock();
                _bclen = mc.getColsPerBlock();
                _tSparseCol = tSparseCol;
-               _dpf = dpf;
+               _dpf = dpf._dpf;
        }
        
        @Override 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
index 26c30d4..22126fe 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import org.apache.sysml.lops.Lop;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
 
 /**
  * Internal representation of a plan alternative for program blocks and 
instructions 
@@ -457,7 +458,7 @@ public class OptNode
                        //partitioned read identified by selected partition 
format
                        String tmp = getParam(ParamType.DATA_PARTITION_FORMAT);
                        ret = ( tmp !=null 
-                                       && 
PDataPartitionFormat.valueOf(tmp)!=PDataPartitionFormat.NONE 
+                                       && 
PartitionFormat.valueOf(tmp)._dpf!=PDataPartitionFormat.NONE 
                                        && flagNested );
                }
                else

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
index 6edcec3..2e4b95b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
@@ -29,12 +29,12 @@ import org.apache.sysml.parser.ParForStatementBlock;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
-import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitioner;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PExecMode;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.POptMode;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PResultMerge;
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PTaskPartitioner;
+import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import 
org.apache.sysml.runtime.controlprogram.parfor.opt.CostEstimator.TestMeasure;
@@ -107,7 +107,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
                //OPTIMIZE PARFOR PLAN
 
                // rewrite 1: data partitioning (incl. log. recompile RIX)
-               HashMap<String, PDataPartitionFormat> partitionedMatrices = new 
HashMap<String,PDataPartitionFormat>();
+               HashMap<String, PartitionFormat> partitionedMatrices = new 
HashMap<String, PartitionFormat>();
                rewriteSetDataPartitioner( pn, ec.getVariables(), 
partitionedMatrices, OptimizerUtils.getLocalMemBudget() );
                double M0b = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); 
//reestimate
 
@@ -230,7 +230,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
        ///
 
        @Override
-       protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap 
vars, HashMap<String,PDataPartitionFormat> partitionedMatrices, double thetaM)
+       protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap 
vars, HashMap<String,PartitionFormat> partitionedMatrices, double thetaM)
                throws DMLRuntimeException
        {
                boolean blockwise = false;
@@ -362,7 +362,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
        //REWRITE set fused data partitioning / execution
        ///
 
-       protected void rewriteSetFusedDataPartitioningExecution(OptNode pn, 
double M, boolean flagLIX, HashMap<String, PDataPartitionFormat> 
partitionedMatrices, LocalVariableMap vars, PExecMode emode)
+       protected void rewriteSetFusedDataPartitioningExecution(OptNode pn, 
double M, boolean flagLIX, HashMap<String, PartitionFormat> 
partitionedMatrices, LocalVariableMap vars, PExecMode emode)
                throws DMLRuntimeException
        {
                if( emode == PExecMode.REMOTE_MR_DP || emode == 
PExecMode.REMOTE_SPARK_DP )
@@ -378,15 +378,15 @@ public class OptimizerConstrained extends 
OptimizerRuleBased
                        }
 
                        String moVarname = 
partitionedMatrices.keySet().iterator().next();
-                       PDataPartitionFormat moDpf = 
partitionedMatrices.get(moVarname);
+                       PartitionFormat moDpf = 
partitionedMatrices.get(moVarname);
                        MatrixObject mo = (MatrixObject)vars.get(moVarname);
 
                        //check if access via iteration variable and sizes match
                        String iterVarname = pfpb.getIterablePredicateVars()[0];
 
                        if( rIsAccessByIterationVariable(pn, moVarname, 
iterVarname) &&
-                          ((moDpf==PDataPartitionFormat.ROW_WISE && 
mo.getNumRows()==_N ) ||
-                               (moDpf==PDataPartitionFormat.COLUMN_WISE && 
mo.getNumColumns()==_N)) )
+                          ((moDpf==PartitionFormat.ROW_WISE && 
mo.getNumRows()==_N ) ||
+                               (moDpf==PartitionFormat.COLUMN_WISE && 
mo.getNumColumns()==_N)) )
                        {
                                int k = (int)Math.min(_N,_rk2);
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 554b217..0e7170f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -82,6 +82,7 @@ import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PExecMode;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.POptMode;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PResultMerge;
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PTaskPartitioner;
+import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
 import org.apache.sysml.runtime.controlprogram.WhileProgramBlock;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
@@ -239,7 +240,7 @@ public class OptimizerRuleBased extends Optimizer
                //OPTIMIZE PARFOR PLAN
                
                // rewrite 1: data partitioning (incl. log. recompile RIX and 
flag opt nodes)
-               HashMap<String, PDataPartitionFormat> partitionedMatrices = new 
HashMap<String,PDataPartitionFormat>();
+               HashMap<String, PartitionFormat> partitionedMatrices = new 
HashMap<String, PartitionFormat>();
                rewriteSetDataPartitioner( pn, ec.getVariables(), 
partitionedMatrices, OptimizerUtils.getLocalMemBudget() );
                double M0b = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); 
//reestimate
                
@@ -403,7 +404,7 @@ public class OptimizerRuleBased extends Optimizer
        //REWRITE set data partitioner
        ///
 
-       protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap 
vars, HashMap<String, PDataPartitionFormat> partitionedMatrices, double thetaM 
) 
+       protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap 
vars, HashMap<String, PartitionFormat> partitionedMatrices, double thetaM ) 
                throws DMLRuntimeException
        {
                if( n.getNodeType() != NodeType.PARFOR )
@@ -423,13 +424,13 @@ public class OptimizerRuleBased extends Optimizer
                        && (_N >= PROB_SIZE_THRESHOLD_PARTITIONING || _Nmax >= 
PROB_SIZE_THRESHOLD_PARTITIONING) ) //only if beneficial wrt problem size
                {
                        ArrayList<String> cand = pfsb.getReadOnlyParentVars();
-                       HashMap<String, PDataPartitionFormat> cand2 = new 
HashMap<String, PDataPartitionFormat>();
+                       HashMap<String, PartitionFormat> cand2 = new 
HashMap<String, PartitionFormat>();
                        for( String c : cand )
                        {
-                               PDataPartitionFormat dpf = 
pfsb.determineDataPartitionFormat( c );
+                               PartitionFormat dpf = 
pfsb.determineDataPartitionFormat( c );
                                
-                               if( dpf != PDataPartitionFormat.NONE 
-                                       && dpf != 
PDataPartitionFormat.BLOCK_WISE_M_N ) 
+                               if( dpf != PartitionFormat.NONE 
+                                       && dpf._dpf != 
PDataPartitionFormat.BLOCK_WISE_M_N ) 
                                {
                                        cand2.put( c, dpf );
                                }       
@@ -459,7 +460,7 @@ public class OptimizerRuleBased extends Optimizer
                return blockwise;
        }
 
-       protected boolean rFindDataPartitioningCandidates( OptNode n, 
HashMap<String, PDataPartitionFormat> cand, LocalVariableMap vars, double 
thetaM ) 
+       protected boolean rFindDataPartitioningCandidates( OptNode n, 
HashMap<String, PartitionFormat> cand, LocalVariableMap vars, double thetaM ) 
                throws DMLRuntimeException
        {
                boolean ret = false;
@@ -477,7 +478,7 @@ public class OptimizerRuleBased extends Optimizer
                        String inMatrix = h.getInput().get(0).getName();
                        if( cand.containsKey(inMatrix) ) //Required Condition: 
partitioning applicable
                        {
-                               PDataPartitionFormat dpf = cand.get(inMatrix);
+                               PartitionFormat dpf = cand.get(inMatrix);
                                double mnew = getNewRIXMemoryEstimate( n, 
inMatrix, dpf, vars );
                                //NOTE: for the moment, we do not partition 
according to the remote mem, because we can execute 
                                //it even without partitioning in CP. However, 
advanced optimizers should reason about this                                    
    
@@ -516,7 +517,7 @@ public class OptimizerRuleBased extends Optimizer
         * @return memory estimate
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       protected double getNewRIXMemoryEstimate( OptNode n, String varName, 
PDataPartitionFormat dpf, LocalVariableMap vars ) 
+       protected double getNewRIXMemoryEstimate( OptNode n, String varName, 
PartitionFormat dpf, LocalVariableMap vars ) 
                throws DMLRuntimeException
        {
                double mem = -1;
@@ -528,7 +529,7 @@ public class OptimizerRuleBased extends Optimizer
                        MatrixObject mo = (MatrixObject) dat;
                        
                        //those are worst-case (dense) estimates
-                       switch( dpf )
+                       switch( dpf._dpf )
                        {
                                case COLUMN_WISE:
                                        mem = 
OptimizerUtils.estimateSize(mo.getNumRows(), 1); 
@@ -536,10 +537,12 @@ public class OptimizerRuleBased extends Optimizer
                                case ROW_WISE:
                                        mem = OptimizerUtils.estimateSize(1, 
mo.getNumColumns());
                                        break;
-                               case BLOCK_WISE_M_N:
-                                       mem = Integer.MAX_VALUE; //TODO
+                               case COLUMN_BLOCK_WISE_N:
+                                       mem = 
OptimizerUtils.estimateSize(mo.getNumRows(), dpf._N); 
                                        break;
-                                       
+                               case ROW_BLOCK_WISE_N:
+                                       mem = 
OptimizerUtils.estimateSize(dpf._N, mo.getNumColumns()); 
+                                       break;  
                                default:
                                        //do nothing
                        }       
@@ -586,7 +589,7 @@ public class OptimizerRuleBased extends Optimizer
                        return LopProperties.ExecType.CP_FILE;
        }
 
-       public static boolean allowsBinaryCellPartitions( MatrixObject mo, 
PDataPartitionFormat dpf ) 
+       public static boolean allowsBinaryCellPartitions( MatrixObject mo, 
PartitionFormat dpf ) 
                throws DMLRuntimeException
        {
                return (getRIXExecType(mo, 
PDataPartitionFormat.COLUMN_BLOCK_WISE, false)==LopProperties.ExecType.CP );
@@ -1031,11 +1034,11 @@ public class OptimizerRuleBased extends Optimizer
                             && 
n.getParam(ParamType.OPSTRING).equals(IndexingOp.OPSTRING)
                             && n.getParam(ParamType.DATA_PARTITION_FORMAT) != 
null )
                {
-                       PDataPartitionFormat dpf = 
PDataPartitionFormat.valueOf(n.getParam(ParamType.DATA_PARTITION_FORMAT));
+                       PartitionFormat dpf = 
PartitionFormat.valueOf(n.getParam(ParamType.DATA_PARTITION_FORMAT));
                        Hop h = 
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
                        String inMatrix = h.getInput().get(0).getName();
                        String indexAccess = null;
-                       switch( dpf )
+                       switch( dpf._dpf )
                        {
                                case ROW_WISE: //input 1 and 2 eq
                                        if( h.getInput().get(1) instanceof 
DataOp )
@@ -1072,7 +1075,7 @@ public class OptimizerRuleBased extends Optimizer
         * @param vars local variable map
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       protected void rewriteSetPartitionReplicationFactor( OptNode n, 
HashMap<String, PDataPartitionFormat> partitionedMatrices, LocalVariableMap 
vars ) 
+       protected void rewriteSetPartitionReplicationFactor( OptNode n, 
HashMap<String, PartitionFormat> partitionedMatrices, LocalVariableMap vars ) 
                throws DMLRuntimeException
        {
                boolean apply = false;
@@ -1487,7 +1490,7 @@ public class OptimizerRuleBased extends Optimizer
         * @param vars local variable map
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       protected void rewriteSetFusedDataPartitioningExecution(OptNode pn, 
double M, boolean flagLIX, HashMap<String, PDataPartitionFormat> 
partitionedMatrices, LocalVariableMap vars) 
+       protected void rewriteSetFusedDataPartitioningExecution(OptNode pn, 
double M, boolean flagLIX, HashMap<String, PartitionFormat> 
partitionedMatrices, LocalVariableMap vars) 
                throws DMLRuntimeException 
        {
                //assertions (warnings of corrupt optimizer decisions)
@@ -1513,15 +1516,15 @@ public class OptimizerRuleBased extends Optimizer
                        
                        //partitioned matrix
                        String moVarname = 
partitionedMatrices.keySet().iterator().next();
-                       PDataPartitionFormat moDpf = 
partitionedMatrices.get(moVarname);
+                       PartitionFormat moDpf = 
partitionedMatrices.get(moVarname);
                        MatrixObject mo = (MatrixObject)vars.get(moVarname);
                        
                        //check if access via iteration variable and sizes match
                        String iterVarname = pfpb.getIterablePredicateVars()[0];
                        
                        if( rIsAccessByIterationVariable(pn, moVarname, 
iterVarname) &&
-                          ((moDpf==PDataPartitionFormat.ROW_WISE && 
mo.getNumRows()==_N ) ||
-                               (moDpf==PDataPartitionFormat.COLUMN_WISE && 
mo.getNumColumns()==_N)) )
+                          ((moDpf==PartitionFormat.ROW_WISE && 
mo.getNumRows()==_N ) ||
+                               (moDpf==PartitionFormat.COLUMN_WISE && 
mo.getNumColumns()==_N)) )
                        {
                                int k = (int)Math.min(_N,_rk2);
                                
@@ -1554,11 +1557,11 @@ public class OptimizerRuleBased extends Optimizer
                             && 
n.getParam(ParamType.OPSTRING).equals(IndexingOp.OPSTRING)
                             && n.getParam(ParamType.DATA_PARTITION_FORMAT) != 
null )
                {
-                       PDataPartitionFormat dpf = 
PDataPartitionFormat.valueOf(n.getParam(ParamType.DATA_PARTITION_FORMAT));
+                       PartitionFormat dpf = 
PartitionFormat.valueOf(n.getParam(ParamType.DATA_PARTITION_FORMAT));
                        Hop h = 
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
                        String inMatrix = h.getInput().get(0).getName();
                        String indexAccess = null;
-                       switch( dpf )
+                       switch( dpf._dpf )
                        {
                                case ROW_WISE: //input 1 and 2 eq
                                        if( h.getInput().get(1) instanceof 
DataOp )
@@ -1584,7 +1587,7 @@ public class OptimizerRuleBased extends Optimizer
        //REWRITE transpose sparse vector operations
        ///
        
-       protected void rewriteSetTranposeSparseVectorOperations(OptNode pn, 
HashMap<String, PDataPartitionFormat> partitionedMatrices, LocalVariableMap 
vars) 
+       protected void rewriteSetTranposeSparseVectorOperations(OptNode pn, 
HashMap<String, PartitionFormat> partitionedMatrices, LocalVariableMap vars) 
                throws DMLRuntimeException 
        {
                //assertions (warnings of corrupt optimizer decisions)
@@ -1600,11 +1603,11 @@ public class OptimizerRuleBased extends Optimizer
                        && partitionedMatrices.size()==1 ) //general applicable
                {
                        String moVarname = 
partitionedMatrices.keySet().iterator().next();
-                       PDataPartitionFormat moDpf = 
partitionedMatrices.get(moVarname);
+                       PartitionFormat moDpf = 
partitionedMatrices.get(moVarname);
                        Data dat = vars.get(moVarname);
                        
                        if(    dat !=null && dat instanceof MatrixObject 
-                               && moDpf == PDataPartitionFormat.COLUMN_WISE    
+                               && moDpf == PartitionFormat.COLUMN_WISE 
                                && 
((MatrixObject)dat).getSparsity()<=MatrixBlock.SPARSITY_TURN_POINT  //check for 
sparse matrix
                                && rIsTransposeSafePartition(pn, moVarname) ) 
//tranpose-safe
                        {
@@ -2541,7 +2544,7 @@ public class OptimizerRuleBased extends Optimizer
        //REWRITE enable runtime piggybacking
        ///
 
-       protected void rewriteEnableRuntimePiggybacking( OptNode n, 
LocalVariableMap vars, HashMap<String, PDataPartitionFormat> 
partitionedMatrices ) 
+       protected void rewriteEnableRuntimePiggybacking( OptNode n, 
LocalVariableMap vars, HashMap<String, PartitionFormat> partitionedMatrices ) 
                throws DMLRuntimeException
        {
                ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java
index 7b6bb29..a1e9ca1 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java
@@ -114,9 +114,15 @@ public final class MatrixIndexingCPFileInstruction extends 
IndexingCPInstruction
                                        case ROW_WISE:
                                                mcNew = new 
MatrixCharacteristics( 1, mc.getCols(), mc.getRowsPerBlock(), 
mc.getColsPerBlock() );
                                                break;
+                                       case ROW_BLOCK_WISE_N:
+                                               mcNew = new 
MatrixCharacteristics( mo.getPartitionSize(), mc.getCols(), 
mc.getRowsPerBlock(), mc.getColsPerBlock() );
+                                               break;  
                                        case COLUMN_WISE:
                                                mcNew = new 
MatrixCharacteristics( mc.getRows(), 1, mc.getRowsPerBlock(), 
mc.getColsPerBlock() );
-                                               break;                          
        
+                                               break;
+                                       case COLUMN_BLOCK_WISE_N:
+                                               mcNew = new 
MatrixCharacteristics( mc.getRows(), mo.getPartitionSize(), 
mc.getRowsPerBlock(), mc.getColsPerBlock() );
+                                               break;  
                                        default:
                                                throw new 
DMLRuntimeException("Unsupported partition format for CP_FILE rangeReIndex: "+ 
mo.getPartitionFormat());
                                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java
index fc05fb1..3675194 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java
@@ -22,6 +22,7 @@ package org.apache.sysml.runtime.matrix;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.parfor.DataPartitioner;
 import org.apache.sysml.runtime.controlprogram.parfor.DataPartitionerRemoteMR;
@@ -81,8 +82,8 @@ public class DataPartitionMR
                                        default: 
                                                throw new 
DMLRuntimeException("Unsupported partition format for distributed cache input: 
"+pformat);
                                }
-                               
-                               DataPartitioner dpart = new 
DataPartitionerRemoteMR(pformat, (int)N, -1, numReducers, replication, 4, 
false, true);
+                               PartitionFormat pf = new 
PartitionFormat(pformat, (int)N);
+                               DataPartitioner dpart = new 
DataPartitionerRemoteMR(pf, -1, numReducers, replication, 4, false, true);
                                out = dpart.createPartitionedMatrixObject(in, 
out, true);
                                
                                sts[i] = out.getMatrixCharacteristics();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForBlockwiseDataPartitioningTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForBlockwiseDataPartitioningTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForBlockwiseDataPartitioningTest.java
new file mode 100644
index 0000000..a01d22e
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForBlockwiseDataPartitioningTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.parfor;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.conf.CompilerConfig;
+import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitioner;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PExecMode;
+import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class ParForBlockwiseDataPartitioningTest extends AutomatedTestBase 
+{      
+       //positive test cases, i.e., test cases where row/column block 
partitioning applied
+       private final static String TEST_NAME1 = 
"parfor_brdatapartitioning_pos";
+       private final static String TEST_NAME2 = 
"parfor_bcdatapartitioning_pos";
+       //negative test cases, i.e., test cases where row/column block 
partitioning not applied
+       private final static String TEST_NAME3 = 
"parfor_brdatapartitioning_neg";
+       private final static String TEST_NAME4 = 
"parfor_bcdatapartitioning_neg";
+       
+       private final static String TEST_DIR = "functions/parfor/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
ParForBlockwiseDataPartitioningTest.class.getSimpleName() + "/";
+       private final static double eps = 1e-10;
+       
+       //moderate data size, force spark rix via unknowns 
+       private final static int rows = (int)1014; 
+       private final static int cols = (int)2024;
+       private final static double sparsity1 = 0.7;
+       private final static double sparsity2 = 0.01;
+       
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "Rout" }) ); 
+               addTestConfiguration(TEST_NAME2, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] { "Rout" }) ); 
+               addTestConfiguration(TEST_NAME3, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] { "Rout" }) ); 
+               addTestConfiguration(TEST_NAME4, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] { "Rout" }) ); 
+       }
+
+       
+       @Test
+       public void testParForRowBlockPartitioningLocalLocalDense() {
+               runParForDataPartitioningTest(TEST_NAME1, 
PDataPartitioner.LOCAL, PExecMode.LOCAL, false);
+       }
+
+       @Test
+       public void testParForRowBlockPartitioningLocalRemoteSparkDense() {
+               runParForDataPartitioningTest(TEST_NAME1, 
PDataPartitioner.LOCAL, PExecMode.REMOTE_SPARK, false);
+       }       
+
+       @Test
+       public void testParForRowBlockPartitioningRemoteSparkLocalDense() {
+               runParForDataPartitioningTest(TEST_NAME1, 
PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, false);
+       }
+
+       @Test
+       public void testParForRowBlockPartitioningRemoteSparkRemoteDense() {
+               runParForDataPartitioningTest(TEST_NAME1, 
PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, false);
+       }
+
+       @Test
+       public void testParForRowBlockPartitioningLocalLocalSparse() {
+               runParForDataPartitioningTest(TEST_NAME1, 
PDataPartitioner.LOCAL, PExecMode.LOCAL, true);
+       }
+
+       @Test
+       public void testParForRowBlockPartitioningLocalRemoteSparkSparse() {
+               runParForDataPartitioningTest(TEST_NAME1, 
PDataPartitioner.LOCAL, PExecMode.REMOTE_SPARK, true);
+       }       
+
+       @Test
+       public void testParForRowBlockPartitioningRemoteSparkLocalSparse() {
+               runParForDataPartitioningTest(TEST_NAME1, 
PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, true);
+       }
+
+       @Test
+       public void testParForRowBlockPartitioningRemoteSparkRemoteSparse() {
+               runParForDataPartitioningTest(TEST_NAME1, 
PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, true);
+       }
+
+       @Test
+       public void testParForColBlockPartitioningLocalLocalDense() {
+               runParForDataPartitioningTest(TEST_NAME2, 
PDataPartitioner.LOCAL, PExecMode.LOCAL, false);
+       }
+
+       @Test
+       public void testParForColBlockPartitioningLocalRemoteSparkDense() {
+               runParForDataPartitioningTest(TEST_NAME2, 
PDataPartitioner.LOCAL, PExecMode.REMOTE_SPARK, false);
+       }       
+
+       @Test
+       public void testParForColBlockPartitioningRemoteSparkLocalDense() {
+               runParForDataPartitioningTest(TEST_NAME2, 
PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, false);
+       }
+
+       @Test
+       public void testParForColBlockPartitioningRemoteSparkRemoteDense() {
+               runParForDataPartitioningTest(TEST_NAME2, 
PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, false);
+       }
+
+       @Test
+       public void testParForColBlockPartitioningLocalLocalSparse() {
+               runParForDataPartitioningTest(TEST_NAME2, 
PDataPartitioner.LOCAL, PExecMode.LOCAL, true);
+       }
+
+       @Test
+       public void testParForColBlockPartitioningLocalRemoteSparkSparse() {
+               runParForDataPartitioningTest(TEST_NAME2, 
PDataPartitioner.LOCAL, PExecMode.REMOTE_SPARK, true);
+       }       
+
+       @Test
+       public void testParForColBlockPartitioningRemoteSparkLocalSparse() {
+               runParForDataPartitioningTest(TEST_NAME2, 
PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, true);
+       }
+
+       @Test
+       public void testParForColBlockPartitioningRemoteSparkRemoteSparse() {
+               runParForDataPartitioningTest(TEST_NAME2, 
PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, true);
+       }
+       
+       //negative examples
+       
+       @Test
+       public void 
testParForRowBlockPartitioningRemoteSparkLocalSparseNegative() {
+               runParForDataPartitioningTest(TEST_NAME3, 
PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, true);
+       }
+       
+       @Test
+       public void 
testParForRowBlockPartitioningRemoteSparkRemoteSparseNegative() {
+               runParForDataPartitioningTest(TEST_NAME3, 
PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, true);
+       }
+       
+       @Test
+       public void 
testParForColBlockPartitioningRemoteSparkLocalSparseNegative() {
+               runParForDataPartitioningTest(TEST_NAME4, 
PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, true);
+       }
+       
+       @Test
+       public void 
testParForColBlockPartitioningRemoteSparkRemoteSparseNegative() {
+               runParForDataPartitioningTest(TEST_NAME4, 
PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, true);
+       }
+       
+       private void runParForDataPartitioningTest( String testname, 
PDataPartitioner partitioner, PExecMode mode, boolean sparse )
+       {
+               RUNTIME_PLATFORM oldRT = rtplatform;
+               boolean oldUseSparkConfig = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               boolean oldDynRecompile = CompilerConfig.FLAG_DYN_RECOMPILE;
+               
+               //run always in spark execution mode
+               DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
+               
+               try
+               {
+                       TestConfiguration config = 
getTestConfiguration(testname);
+                       loadTestConfiguration(config);
+                       
+                       CompilerConfig.FLAG_DYN_RECOMPILE = false;
+                       
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + testname + ".dml";
+                       programArgs = new String[]{"-args", input("V"), 
+                               partitioner.name(), mode.name(), output("R") };
+                       
+                       fullRScriptName = HOME + testname + ".R";
+                       rCmd = "Rscript" + " " + fullRScriptName + " " + 
inputDir() + " " + expectedDir();
+       
+                       //generate input data
+                       int lrows = testname.equals(TEST_NAME1) || 
testname.equals(TEST_NAME3) ? rows : cols;
+                       int lcols = testname.equals(TEST_NAME1) || 
testname.equals(TEST_NAME3) ? cols : rows;
+                       double lsparsity = sparse ? sparsity2 : sparsity1;
+                       double[][] V = getRandomMatrix(lrows, lcols, 0, 1, 
lsparsity, System.nanoTime());
+                       writeInputMatrixWithMTD("V", V, true);
+       
+                       //run test
+                       runTest(true, false, null, -1);
+                       runRScript(true);
+                       
+                       //compare matrices
+                       HashMap<CellIndex, Double> dmlfile = 
readDMLMatrixFromHDFS("R");
+                       HashMap<CellIndex, Double> rfile  = 
readRMatrixFromFS("Rout");
+                       TestUtils.compareMatrices(dmlfile, rfile, eps, "DML", 
"R");
+               }
+               finally
+               {
+                       rtplatform = oldRT;
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = oldUseSparkConfig;
+                       CompilerConfig.FLAG_DYN_RECOMPILE = oldDynRecompile;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_neg.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_neg.R 
b/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_neg.R
new file mode 100644
index 0000000..488d237
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_neg.R
@@ -0,0 +1,37 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+args <- commandArgs(TRUE)
+options(digits=22)
+
+library("Matrix")
+
+X = as.matrix(readMM(paste(args[1], "V.mtx", sep="")))
+N = 200;
+
+R = matrix(0, ceiling(ncol(X)/N), 1); 
+for( bi in 1:ceiling(ncol(X)/N)) {
+   Xbi = X[,(7+(bi-1)*N+1):min(bi*N,ncol(X))];   
+   R[bi,1] = sum(Xbi); 
+}   
+
+writeMM(as(R, "CsparseMatrix"), paste(args[2], "Rout", sep=""));

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_neg.dml
----------------------------------------------------------------------
diff --git 
a/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_neg.dml 
b/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_neg.dml
new file mode 100644
index 0000000..5a08430
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_neg.dml
@@ -0,0 +1,37 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+X = read($1);
+N = 200;
+
+if( sum(X)<0 ) #cause unknown sparsity
+   X = matrix(1, rows=100000000, cols=ncol(X));
+
+R = matrix(0, rows=ceil(ncol(X)/N), cols=1); 
+
+parfor( bi in 1:ceil(ncol(X)/N), opt=CONSTRAINED, datapartitioner=$2, mode=$3, 
log=DEBUG) {
+   Xbi = X[,(7+(bi-1)*N+1):min(bi*N,ncol(X))];   
+   print(sum(Xbi));
+   R[bi,1] = sum(Xbi); 
+}   
+
+write(R, $4);       
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_pos.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_pos.R 
b/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_pos.R
new file mode 100644
index 0000000..62413b5
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_pos.R
@@ -0,0 +1,37 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+args <- commandArgs(TRUE)
+options(digits=22)
+
+library("Matrix")
+
+X = as.matrix(readMM(paste(args[1], "V.mtx", sep="")))
+N = 200;
+
+R = matrix(0, ceiling(ncol(X)/N), 1); 
+for( bi in 1:ceiling(ncol(X)/N)) {
+   Xbi = X[,((bi-1)*N+1):min(bi*N,ncol(X))];   
+   R[bi,1] = sum(Xbi); 
+}   
+
+writeMM(as(R, "CsparseMatrix"), paste(args[2], "Rout", sep=""));

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_pos.dml
----------------------------------------------------------------------
diff --git 
a/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_pos.dml 
b/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_pos.dml
new file mode 100644
index 0000000..0e6b56a
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_pos.dml
@@ -0,0 +1,37 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+X = read($1);
+N = 200;
+
+if( sum(X)<0 ) #cause unknown sparsity
+   X = matrix(1, rows=100000000, cols=ncol(X));
+
+R = matrix(0, rows=ceil(ncol(X)/N), cols=1); 
+
+parfor( bi in 1:ceil(ncol(X)/N), opt=CONSTRAINED, datapartitioner=$2, mode=$3, 
log=DEBUG) {
+   Xbi = X[,((bi-1)*N+1):min(bi*N,ncol(X))];   
+   print(sum(Xbi));
+   R[bi,1] = sum(Xbi); 
+}   
+
+write(R, $4);       


Reply via email to