[SYSTEMML-1877] Fix setup parfor data-partition-execute (univar stats)

This patch fixes a setup issue for column-wise partitioning in the fused
parfor data-partition-execute spark job. To avoid such issues in the
future, this also includes a refactoring to centralize the analysis of
partition sizes as well as test cases that force the fused
data-partition-execute job for both MR and Spark.

Additionally, this patch fixes the constrained parfor optimizer to
consider the fused data-partition-execute job (so far, this job was
never selected even if both data partitioning and execution were forced
to its remote execution types).


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/9178a954
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/9178a954
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/9178a954

Branch: refs/heads/master
Commit: 9178a95495227e978dcc1049f04431ddbd2c4fc5
Parents: ec35215
Author: Matthias Boehm <[email protected]>
Authored: Thu Aug 31 22:39:32 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Fri Sep 1 12:13:25 2017 -0700

----------------------------------------------------------------------
 .../controlprogram/ParForProgramBlock.java      |  29 ++++
 .../parfor/DataPartitionerRemoteMapper.java     |  28 ++--
 .../parfor/RemoteDPParForSparkWorker.java       |  21 +--
 .../parfor/RemoteDPParWorkerReducer.java        |  33 ++--
 .../RemoteParForColocatedNLineInputFormat.java  |  20 +--
 .../parfor/opt/OptimizerConstrained.java        |  21 ++-
 .../parfor/opt/OptimizerRuleBased.java          |   3 +-
 .../matrix/mapred/MRJobConfiguration.java       |  47 ++----
 .../parfor/ParForDataPartitionExecuteTest.java  | 153 +++++++++++++++++++
 .../functions/parfor/DataPartitionExecute.R     |  41 +++++
 .../functions/parfor/DataPartitionExecute.dml   |  42 +++++
 .../functions/parfor/ZPackageSuite.java         |   1 +
 12 files changed, 330 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index 3a9bf51..ce8bbed 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -215,6 +215,11 @@ public class ParForProgramBlock extends ForProgramBlock
                        return _dpf == PDataPartitionFormat.COLUMN_BLOCK_WISE_N 
                                || _dpf == 
PDataPartitionFormat.ROW_BLOCK_WISE_N;
                }
+               public boolean isRowwise() {
+                       return _dpf == PDataPartitionFormat.ROW_WISE
+                               || _dpf == PDataPartitionFormat.ROW_BLOCK_WISE
+                               || _dpf == 
PDataPartitionFormat.ROW_BLOCK_WISE_N;
+               }
                public long getNumParts(MatrixCharacteristics mc) {
                        switch( _dpf ) {
                                case ROW_WISE: return mc.getRows();
@@ -227,6 +232,30 @@ public class ParForProgramBlock extends ForProgramBlock
                                        throw new RuntimeException("Unsupported 
partition format: "+_dpf);
                        }
                }
+               public long getNumRows(MatrixCharacteristics mc) {
+                       switch( _dpf ) {
+                               case ROW_WISE: return 1;
+                               case ROW_BLOCK_WISE: return 
mc.getRowsPerBlock();
+                               case ROW_BLOCK_WISE_N: return _N;
+                               case COLUMN_WISE: return mc.getRows();
+                               case COLUMN_BLOCK_WISE: return mc.getRows();
+                               case COLUMN_BLOCK_WISE_N: return mc.getRows();
+                               default:
+                                       throw new RuntimeException("Unsupported 
partition format: "+_dpf);
+                       }
+               }
+               public long getNumColumns(MatrixCharacteristics mc) {
+                       switch( _dpf ) {
+                               case ROW_WISE: return mc.getCols();
+                               case ROW_BLOCK_WISE: return mc.getCols();
+                               case ROW_BLOCK_WISE_N: return mc.getCols();
+                               case COLUMN_WISE: return 1;
+                               case COLUMN_BLOCK_WISE: return 
mc.getColsPerBlock();
+                               case COLUMN_BLOCK_WISE_N: return _N;
+                               default:
+                                       throw new RuntimeException("Unsupported 
partition format: "+_dpf);
+                       }
+               }
        }
        
        public enum PDataPartitioner {

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java
index 6fcfc1a..e26201e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.Reporter;
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -48,15 +49,11 @@ import org.apache.sysml.runtime.util.MapReduceTool;
  */
 public class DataPartitionerRemoteMapper 
        implements Mapper<Writable, Writable, Writable, Writable>
-{      
-       
+{
        private DataPartitionerMapper _mapper = null;
        
-       public DataPartitionerRemoteMapper( ) 
-       {
-               
-       }
-
+       public DataPartitionerRemoteMapper( ) { }
+       
        @Override
        public void map(Writable key, Writable value, OutputCollector<Writable, 
Writable> out, Reporter reporter) 
                throws IOException
@@ -67,10 +64,7 @@ public class DataPartitionerRemoteMapper
        @Override
        public void configure(JobConf job)
        {
-               long rlen = MRJobConfiguration.getPartitioningNumRows( job );
-               long clen = MRJobConfiguration.getPartitioningNumCols( job );
-               int brlen = MRJobConfiguration.getPartitioningBlockNumRows( job 
);
-               int bclen = MRJobConfiguration.getPartitioningBlockNumCols( job 
);
+               MatrixCharacteristics mc = 
MRJobConfiguration.getPartitionedMatrixSize(job);
                InputInfo ii = MRJobConfiguration.getPartitioningInputInfo( job 
);
                OutputInfo oi = MRJobConfiguration.getPartitioningOutputInfo( 
job );
                PDataPartitionFormat pdf = 
MRJobConfiguration.getPartitioningFormat( job );
@@ -78,17 +72,21 @@ public class DataPartitionerRemoteMapper
                boolean keepIndexes =  
MRJobConfiguration.getPartitioningIndexFlag( job );
                
                if( ii == InputInfo.TextCellInputInfo )
-                       _mapper = new DataPartitionerMapperTextcell(rlen, clen, 
brlen, bclen, pdf, n);
+                       _mapper = new 
DataPartitionerMapperTextcell(mc.getRows(), mc.getCols(),
+                               mc.getRowsPerBlock(), mc.getColsPerBlock(), 
pdf, n);
                else if( ii == InputInfo.BinaryCellInputInfo )
-                       _mapper = new DataPartitionerMapperBinarycell(rlen, 
clen, brlen, bclen, pdf, n);
+                       _mapper = new 
DataPartitionerMapperBinarycell(mc.getRows(), mc.getCols(),
+                               mc.getRowsPerBlock(), mc.getColsPerBlock(), 
pdf, n);
                else if( ii == InputInfo.BinaryBlockInputInfo )
                {
                        if( oi == OutputInfo.BinaryBlockOutputInfo )
-                               _mapper = new 
DataPartitionerMapperBinaryblock(rlen, clen, brlen, bclen, pdf, n, keepIndexes);
+                               _mapper = new 
DataPartitionerMapperBinaryblock(mc.getRows(), mc.getCols(),
+                                       mc.getRowsPerBlock(), 
mc.getColsPerBlock(), pdf, n, keepIndexes);
                        else if( oi == OutputInfo.BinaryCellOutputInfo )
                        {
                                boolean outputEmpty = 
MRJobConfiguration.getProgramBlocks(job)!=null; //fused parfor
-                               _mapper = new 
DataPartitionerMapperBinaryblock2Binarycell(job, rlen, clen, brlen, bclen, pdf, 
n, keepIndexes, outputEmpty); 
+                               _mapper = new 
DataPartitionerMapperBinaryblock2Binarycell(job, mc.getRows(), mc.getCols(),
+                                       mc.getRowsPerBlock(), 
mc.getColsPerBlock(), pdf, n, keepIndexes, outputEmpty); 
                        }
                        else
                                throw new RuntimeException("Partitioning from 
'"+ii+"' to '"+oi+"' not supported");

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index d20e7f3..dbc3fbf 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -87,18 +87,8 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
                _aIters = aiters;
                
                //setup matrix block partition meta data
-               switch( dpf._dpf ) {
-                       case ROW_WISE: 
-                               _rlen = (int)mc.getRows(); _clen = 1; break;
-                       case ROW_BLOCK_WISE_N:
-                               _rlen = dpf._N; _clen = (int)mc.getCols(); 
break;
-                       case COLUMN_BLOCK_WISE:
-                               _rlen = 1; _clen = (int)mc.getCols(); break;
-                       case COLUMN_BLOCK_WISE_N:
-                               _rlen = (int)mc.getRows(); _clen = dpf._N; 
break;
-                       default:
-                               throw new RuntimeException("Unsupported 
partition format: "+dpf._dpf.name());
-               }
+               _rlen = (int)dpf.getNumRows(mc);
+               _clen = (int)dpf.getNumColumns(mc);
                _brlen = mc.getRowsPerBlock();
                _bclen = mc.getColsPerBlock();
                _tSparseCol = tSparseCol;
@@ -237,8 +227,8 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
                                int col_offset = 
(int)(pval.indexes.getColumnIndex()-1)*_bclen;
                                if( !partition.isInSparseFormat() ) //DENSE
                                        partition.copy( row_offset, 
row_offset+pval.block.getNumRows()-1, 
-                                                          col_offset, 
col_offset+pval.block.getNumColumns()-1,
-                                                          pval.block, false ); 
+                                               col_offset, 
col_offset+pval.block.getNumColumns()-1,
+                                               pval.block, false ); 
                                else //SPARSE 
                                        partition.appendToSparse(pval.block, 
row_offset, col_offset);
                                lnnz += pval.block.getNonZeros();
@@ -250,8 +240,7 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
                        partition.setNonZeros(lnnz);
                        partition.examSparsity();
                }
-               catch(DMLRuntimeException ex)
-               {
+               catch(DMLRuntimeException ex) {
                        throw new IOException(ex);
                }
                

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
index 3f26945..d022ac8 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
@@ -32,6 +32,7 @@ import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
 import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
@@ -42,6 +43,7 @@ import 
org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
 import org.apache.sysml.runtime.instructions.cp.IntObject;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
@@ -66,18 +68,15 @@ public class RemoteDPParWorkerReducer extends ParWorker
        //reuse matrix partition
        private MatrixBlock _partition = null; 
        private boolean _tSparseCol = false;
-               
-       //MR ParWorker attributes  
-       protected String  _stringID       = null;
+       
+       //MR ParWorker attributes 
+       protected String _stringID = null;
 
        //cached collector/reporter
        protected OutputCollector<Writable, Writable> _out = null;
        protected Reporter _report = null;
 
-       public RemoteDPParWorkerReducer() 
-       {
-               
-       }
+       public RemoteDPParWorkerReducer() { }
        
        @Override
        public void reduce(LongWritable key, Iterator<Writable> valueList, 
OutputCollector<Writable, Writable> out, Reporter reporter)
@@ -107,8 +106,7 @@ public class RemoteDPParWorkerReducer extends ParWorker
                        //execute program
                        executeTask( lTask );
                }
-               catch(Exception ex)
-               {
+               catch(Exception ex) {
                        throw new IOException("ParFOR: Failed to execute 
task.",ex);
                }
                
@@ -120,18 +118,15 @@ public class RemoteDPParWorkerReducer extends ParWorker
        public void configure(JobConf job)
        {
                //Step 1: configure data partitioning information
-               _rlen = (int)MRJobConfiguration.getPartitioningNumRows( job );
-               _clen = (int)MRJobConfiguration.getPartitioningNumCols( job );
-               _brlen = MRJobConfiguration.getPartitioningBlockNumRows( job );
-               _bclen = MRJobConfiguration.getPartitioningBlockNumCols( job );
+               _dpf = MRJobConfiguration.getPartitioningFormat( job );
+               MatrixCharacteristics mc = 
MRJobConfiguration.getPartitionedMatrixSize(job);
+               PartitionFormat pf = new PartitionFormat(_dpf, 
MRJobConfiguration.getPartitioningSizeN(job));
+               _rlen = (int)pf.getNumRows(mc);
+               _clen = (int)pf.getNumColumns(mc);
+               _brlen = mc.getRowsPerBlock();
+               _bclen = mc.getColsPerBlock();
                _iterVar = MRJobConfiguration.getPartitioningItervar( job );
                _inputVar = MRJobConfiguration.getPartitioningMatrixvar( job );
-               _dpf = MRJobConfiguration.getPartitioningFormat( job );         
-               switch( _dpf ) { //create matrix partition for reuse
-                       case ROW_WISE:    _rlen = 1; break;
-                       case COLUMN_WISE: _clen = 1; break;
-                       default:  throw new RuntimeException("Partition format 
not yet supported in fused partition-execute: "+_dpf);
-               }
                _info = MRJobConfiguration.getPartitioningOutputInfo( job );
                _tSparseCol = MRJobConfiguration.getPartitioningTransposedCol( 
job ); 
                if( _tSparseCol )

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java
index d5cc4bc..a2a231d 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.lib.NLineInputFormat;
 
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
 
 /**
@@ -37,29 +39,21 @@ import 
org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
  */
 public class RemoteParForColocatedNLineInputFormat extends NLineInputFormat
 {
-       
        @Override
        public InputSplit[] getSplits(JobConf job, int numSplits) throws 
IOException 
-       {       
+       {
                InputSplit[] tmp = super.getSplits(job, numSplits);
                
                //get partitioning information
+               MatrixCharacteristics mc = 
MRJobConfiguration.getPartitionedMatrixSize(job);
                PDataPartitionFormat dpf = 
MRJobConfiguration.getPartitioningFormat(job);
-               int blen = -1;
-               switch( dpf ) {
-                       case ROW_WISE:          blen = 1; break;
-                       case ROW_BLOCK_WISE:    blen = 
MRJobConfiguration.getPartitioningBlockNumRows(job); break;
-                       case COLUMN_WISE:       blen = 1; break;
-                       case COLUMN_BLOCK_WISE: blen = 
MRJobConfiguration.getPartitioningBlockNumCols(job); break;
-                       default: 
-                               //do nothing
-               }               
+               PartitionFormat pf = new PartitionFormat(dpf, -1);
+               int blen = (int) (pf.isRowwise() ? pf.getNumRows(mc) : 
pf.getNumColumns(mc));
                String fname = MRJobConfiguration.getPartitioningFilename(job);
 
                //create wrapper splits 
                InputSplit[] ret = new InputSplit[ tmp.length ];
-               for( int i=0; i<tmp.length; i++ )
-               {
+               for( int i=0; i<tmp.length; i++ ) {
                        //check for robustness of subsequent cast
                        if( tmp[i] instanceof FileSplit ) 
                                ret[i] = new RemoteParForColocatedFileSplit( 
(FileSplit) tmp[i], fname, blen );

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
index 9e78c2a..1ca5631 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
@@ -231,19 +231,18 @@ public class OptimizerConstrained extends 
OptimizerRuleBased
        protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap 
vars, HashMap<String,PartitionFormat> partitionedMatrices, double thetaM)
                throws DMLRuntimeException
        {
-               boolean blockwise = false;
-
+               //call rewrite first to obtain partitioning information
+               String initPlan = n.getParam(ParamType.DATA_PARTITIONER);
+               boolean blockwise = super.rewriteSetDataPartitioner(n, vars, 
partitionedMatrices, thetaM);
+               
                // constraint awareness
-               if( 
!n.getParam(ParamType.DATA_PARTITIONER).equals(PDataPartitioner.UNSPECIFIED.toString())
 )
-               {
-                       Object[] o = 
OptTreeConverter.getAbstractPlanMapping().getMappedProg(n.getID());
-                       ParForProgramBlock pfpb = (ParForProgramBlock) o[1];
-                       
pfpb.setDataPartitioner(PDataPartitioner.valueOf(n.getParam(ParamType.DATA_PARTITIONER)));
-                       LOG.debug(getOptMode()+" OPT: forced 'set data 
partitioner' - result="+n.getParam(ParamType.DATA_PARTITIONER) );
+               if( !initPlan.equals(PDataPartitioner.UNSPECIFIED.name()) ) {
+                       ParForProgramBlock pfpb = (ParForProgramBlock) 
OptTreeConverter
+                               
.getAbstractPlanMapping().getMappedProg(n.getID())[1];
+                       
pfpb.setDataPartitioner(PDataPartitioner.valueOf(initPlan));
+                       LOG.debug(getOptMode()+" OPT: forced 'set data 
partitioner' - result=" + initPlan );
                }
-               else
-                       super.rewriteSetDataPartitioner(n, vars, 
partitionedMatrices, thetaM);
-
+               
                return blockwise;
        }
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 154109a..9456703 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -100,7 +100,6 @@ import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
 import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.cp.Data;
 import org.apache.sysml.runtime.instructions.cp.FunctionCallCPInstruction;
-import org.apache.sysml.runtime.instructions.gpu.context.GPUContext;
 import org.apache.sysml.runtime.instructions.gpu.context.GPUContextPool;
 import org.apache.sysml.runtime.instructions.spark.data.RDDObject;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -1505,7 +1504,7 @@ public class OptimizerRuleBased extends Optimizer
                        && partitionedMatrices.size()==1 ) //only one 
partitioned matrix
                {
                        ParForProgramBlock pfpb = (ParForProgramBlock) 
OptTreeConverter
-                         
.getAbstractPlanMapping().getMappedProg(pn.getID())[1];
+                               
.getAbstractPlanMapping().getMappedProg(pn.getID())[1];
                        
                        //partitioned matrix
                        String moVarname = 
partitionedMatrices.keySet().iterator().next();

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
index 88512b6..5cd5daf 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
@@ -563,61 +563,42 @@ public class MRJobConfiguration
                
                //set transpose sparse column vector
                job.setBoolean(PARTITIONING_OUTPUT_KEEP_INDEXES_CONFIG, 
keepIndexes);
-                               
        }
        
-       public static long getPartitioningNumRows( JobConf job )
-       {
-               return 
Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_ROW_CONFIG));
+       public static MatrixCharacteristics getPartitionedMatrixSize(JobConf 
job) {
+               return new MatrixCharacteristics(
+                       
Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_ROW_CONFIG)),
+                       
Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_COLUMN_CONFIG)),
+                       
Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_ROW_CONFIG)),
+                       
Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_COLUMN_CONFIG)));
        }
        
-       public static long getPartitioningNumCols( JobConf job )
-       {
-               return 
Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_COLUMN_CONFIG));
-       }
        
-       public static void setPartitioningBlockNumRows( JobConf job, int brlen )
-       {
+       public static void setPartitioningBlockNumRows( JobConf job, int brlen 
) {
                job.set(PARTITIONING_INPUT_BLOCK_NUM_ROW_CONFIG, 
String.valueOf(brlen));
        }
        
-       public static int getPartitioningBlockNumRows( JobConf job )
-       {
-               return 
Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_ROW_CONFIG));
-       }
-
-       public static void setPartitioningBlockNumCols( JobConf job, int bclen )
-       {
+       public static void setPartitioningBlockNumCols( JobConf job, int bclen 
) {
                
job.set(PARTITIONING_INPUT_BLOCK_NUM_COLUMN_CONFIG,String.valueOf(bclen));
        }
        
-       public static int getPartitioningBlockNumCols( JobConf job )
-       {
-               return 
Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_COLUMN_CONFIG));
-       }
-       
-       public static InputInfo getPartitioningInputInfo( JobConf job )
-       {
+       public static InputInfo getPartitioningInputInfo( JobConf job ) {
                return 
InputInfo.stringToInputInfo(job.get(PARTITIONING_INPUT_INFO_CONFIG));
        }
        
-       public static OutputInfo getPartitioningOutputInfo( JobConf job )
-       {
+       public static OutputInfo getPartitioningOutputInfo( JobConf job ) {
                return 
OutputInfo.stringToOutputInfo(job.get(PARTITIONING_OUTPUT_INFO_CONFIG));
        }
-
-       public static void setPartitioningFormat( JobConf job, 
PDataPartitionFormat dpf )
-       {
+       
+       public static void setPartitioningFormat( JobConf job, 
PDataPartitionFormat dpf ) {
                job.set(PARTITIONING_OUTPUT_FORMAT_CONFIG, dpf.toString());
        }
        
-       public static PDataPartitionFormat getPartitioningFormat( JobConf job )
-       {
+       public static PDataPartitionFormat getPartitioningFormat( JobConf job ) 
{
                return 
PDataPartitionFormat.valueOf(job.get(PARTITIONING_OUTPUT_FORMAT_CONFIG));
        }
        
-       public static int getPartitioningSizeN( JobConf job )
-       {
+       public static int getPartitioningSizeN( JobConf job ) {
                return Integer.parseInt(job.get(PARTITIONING_OUTPUT_N_CONFIG));
        }
        

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java
new file mode 100644
index 0000000..80c7568
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.parfor;
+
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class ParForDataPartitionExecuteTest extends AutomatedTestBase 
+{
+       private final static String TEST_NAME = "DataPartitionExecute";
+       private final static String TEST_DIR = "functions/parfor/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
ParForDataPartitionExecuteTest.class.getSimpleName() + "/";
+       private final static double eps = 1e-10;
+       
+       private final static int dim1 = 2001;
+       private final static int dim2 = 101;
+       private final static double sparsity1 = 0.7;
+       private final static double sparsity2 = 0.3d;
+       
+       @Override
+       public void setUp() {
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[]{"R"}));
+       }
+       
+       @Test
+       public void testFusedDataPartitionExecuteRowDenseMR() {
+               runFusedDataPartitionExecuteTest(false, true, ExecType.MR);
+       }
+       
+       @Test
+       public void testFusedDataPartitionExecuteColDenseMR() {
+               runFusedDataPartitionExecuteTest(false, false, ExecType.MR);
+       }
+       
+       @Test
+       public void testFusedDataPartitionExecuteRowSparseMR() {
+               runFusedDataPartitionExecuteTest(true, true, ExecType.MR);
+       }
+       
+       @Test
+       public void testFusedDataPartitionExecuteColSparseMR() {
+               runFusedDataPartitionExecuteTest(true, false, ExecType.MR);
+       }
+       
+       @Test
+       public void testFusedDataPartitionExecuteRowDenseSpark() {
+               runFusedDataPartitionExecuteTest(false, true, ExecType.SPARK);
+       }
+       
+       @Test
+       public void testFusedDataPartitionExecuteColDenseSpark() {
+               runFusedDataPartitionExecuteTest(false, false, ExecType.SPARK);
+       }
+       
+       @Test
+       public void testFusedDataPartitionExecuteRowSparseSpark() {
+               runFusedDataPartitionExecuteTest(true, true, ExecType.SPARK);
+       }
+       
+       @Test
+       public void testFusedDataPartitionExecuteColSparseSpark() {
+               runFusedDataPartitionExecuteTest(true, false, ExecType.SPARK);
+       }
+       
+       private void runFusedDataPartitionExecuteTest(boolean sparse, boolean 
row, ExecType et)
+       {
+               RUNTIME_PLATFORM platformOld = rtplatform;
+               switch( et ){
+                       case MR: rtplatform = RUNTIME_PLATFORM.HYBRID; break;
+                       case SPARK: rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK; 
break;
+                       default: throw new RuntimeException("Unsupported exec 
type: "+et.name());
+               }
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               if( et == ExecType.SPARK )
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               
+               //modify memory budget to trigger fused datapartition-execute
+               long oldmem = InfrastructureAnalyzer.getLocalMaxMemory();
+               InfrastructureAnalyzer.setLocalMaxMemory(1*1024*1024); //1MB
+               
+               try
+               {
+                       int rows = row ? dim2 : dim1;
+                       int cols = row ? dim1 : dim2;
+                       
+                       TestConfiguration config = 
getTestConfiguration(TEST_NAME);
+                       config.addVariable("rows", rows);
+                       config.addVariable("cols", cols);
+                       loadTestConfiguration(config);
+                       
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + TEST_NAME + ".dml";
+                       programArgs = new String[]{"-stats", "-args", 
input("X"), 
+                               String.valueOf(et == 
ExecType.SPARK).toUpperCase(),
+                               String.valueOf(row).toUpperCase(), output("R") 
};
+                       
+                       fullRScriptName = HOME + TEST_NAME + ".R";
+                       rCmd = "Rscript" + " " + fullRScriptName + " " + 
inputDir() 
+                               + " " + String.valueOf(row).toUpperCase() + " " 
+ expectedDir();
+                       
+                       //generate input data
+                       double sparsity = sparse ? sparsity2 : sparsity1;
+                       double[][] X = getRandomMatrix(rows, cols, 0, 1, 
sparsity, 7);
+                       writeInputMatrixWithMTD("X", X, true);
+                       
+                       //run test case
+                       runTest(true, false, null, -1);
+                       runRScript(true);
+                       
+                       //compare matrices
+                       HashMap<CellIndex, Double> dmlfile = 
readDMLMatrixFromHDFS("R");
+                       HashMap<CellIndex, Double> rfile  = 
readRMatrixFromFS("R");
+                       TestUtils.compareMatrices(dmlfile, rfile, eps, "DML", 
"R");
+                       
+                       //check for compiled datapartition-execute
+                       Assert.assertTrue(heavyHittersContainsSubString(
+                               (et == ExecType.SPARK) ? "ParFor-DPESP" : 
"MR-Job_ParFor-DPEMR"));
+               }
+               finally {
+                       rtplatform = platformOld;
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+                       InfrastructureAnalyzer.setLocalMaxMemory(oldmem); //1MB
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test/scripts/functions/parfor/DataPartitionExecute.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/parfor/DataPartitionExecute.R 
b/src/test/scripts/functions/parfor/DataPartitionExecute.R
new file mode 100644
index 0000000..8534606
--- /dev/null
+++ b/src/test/scripts/functions/parfor/DataPartitionExecute.R
@@ -0,0 +1,41 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+args <- commandArgs(TRUE)
+options(digits=22)
+
+library("Matrix")
+
+X <- as.matrix(readMM(paste(args[1], "X.mtx", sep="")))
+R = matrix(0, nrow(X), ncol(X)); 
+
+if( as.logical(args[2]) ) {
+   for(i in 1:nrow(X)) {
+      R[i,] = pmin(X[i,], sum(X[i,]));
+   }
+} else {
+   for(i in 1:ncol(X)) {
+      R[,i] = pmin(X[,i], sum(X[,i]));
+   }
+}
+
+writeMM(as(R, "CsparseMatrix"), paste(args[3], "R", sep=""));

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test/scripts/functions/parfor/DataPartitionExecute.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/parfor/DataPartitionExecute.dml 
b/src/test/scripts/functions/parfor/DataPartitionExecute.dml
new file mode 100644
index 0000000..0fd2299
--- /dev/null
+++ b/src/test/scripts/functions/parfor/DataPartitionExecute.dml
@@ -0,0 +1,42 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = read($1);
+R = matrix(0, nrow(X), ncol(X));
+
+if( $2 ) {
+  if( $3 )
+     parfor(i in 1:nrow(X), opt=CONSTRAINED, mode=REMOTE_SPARK_DP, 
datapartitioner=REMOTE_SPARK)
+        R[i,] = min(X[i,], sum(X[i,]));
+  else
+     parfor(i in 1:ncol(X), opt=CONSTRAINED, mode=REMOTE_SPARK_DP, 
datapartitioner=REMOTE_SPARK)
+        R[,i] = min(X[,i], sum(X[,i]));
+}
+else {
+  if( $3 )
+     parfor(i in 1:nrow(X), opt=CONSTRAINED, mode=REMOTE_MR_DP, 
datapartitioner=REMOTE_MR)
+        R[i,] = min(X[i,], sum(X[i,]));
+  else
+     parfor(i in 1:ncol(X), opt=CONSTRAINED, mode=REMOTE_MR_DP, 
datapartitioner=REMOTE_MR)
+        R[,i] = min(X[,i], sum(X[,i]));
+}
+
+write(R, $4);

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
----------------------------------------------------------------------
diff --git 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
index 976b23f..c898a7a 100644
--- 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
+++ 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
@@ -30,6 +30,7 @@ import org.junit.runners.Suite;
        ParForAdversarialLiteralsTest.class,
        ParForBlockwiseDataPartitioningTest.class,
        ParForColwiseDataPartitioningTest.class,
+       ParForDataPartitionExecuteTest.class,
        ParForDataPartitionLeftIndexingTest.class,
        ParForDependencyAnalysisTest.class,
        ParForFunctionSerializationTest.class,

Reply via email to