[SYSTEMML-2096] Extended spark/mr parfor result merge w/ accumulators

This patch completes the runtime integration of parfor result merge with
accumulators (for the += operator) by all distributed MR and Spark
operations as well as related tests.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/2ef6342f
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/2ef6342f
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/2ef6342f

Branch: refs/heads/master
Commit: 2ef6342fa013a64cdc0a5d0aa667e60aeeecaa84
Parents: 4749851
Author: Matthias Boehm <[email protected]>
Authored: Sat Jan 27 22:50:41 2018 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Sat Jan 27 22:50:41 2018 -0800

----------------------------------------------------------------------
 .../controlprogram/parfor/ResultMerge.java      |  7 ++-
 .../parfor/ResultMergeLocalAutomatic.java       |  2 +
 .../parfor/ResultMergeLocalFile.java            | 14 +++---
 .../parfor/ResultMergeLocalMemory.java          |  2 +
 .../parfor/ResultMergeRemoteMR.java             |  8 ++--
 .../parfor/ResultMergeRemoteReducer.java        | 16 ++++---
 .../parfor/ResultMergeRemoteSpark.java          | 30 ++++++-------
 .../parfor/ResultMergeRemoteSparkWCompare.java  |  4 ++
 .../matrix/mapred/MRJobConfiguration.java       | 11 +++--
 .../ParForAccumulatorResultMergeTest.java       | 47 ++++++++++++++++++--
 .../functions/parfor/parfor_accumulator.R       | 39 ++++++++++++++++
 .../functions/parfor/parfor_accumulator1.R      | 39 ----------------
 .../functions/parfor/parfor_accumulator2.dml    | 34 ++++++++++++++
 .../functions/parfor/parfor_accumulator3.dml    | 34 ++++++++++++++
 14 files changed, 207 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java
index 5e814fa..7d43475 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java
@@ -19,6 +19,7 @@
 
 package org.apache.sysml.runtime.controlprogram.parfor;
 
+import java.io.Serializable;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -37,8 +38,12 @@ import 
org.apache.sysml.runtime.matrix.operators.BinaryOperator;
  * These properties allow us to realize result merging in parallel without any 
synchronization. 
  * 
  */
-public abstract class ResultMerge 
+public abstract class ResultMerge implements Serializable
 {
+       //note: this class needs to be serializable to ensure that all 
attributes of
+       //ResultMergeRemoteSparkWCompare are included in the task closure
+       private static final long serialVersionUID = 2620430969346516677L;
+       
        protected static final Log LOG = 
LogFactory.getLog(ResultMerge.class.getName());
        protected static final String NAME_SUFFIX = "_rm";
        protected static final BinaryOperator PLUS = 
InstructionUtils.parseBinaryOperator("+");

http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
index e9d178a..135cf9c 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
@@ -29,6 +29,8 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 
 public class ResultMergeLocalAutomatic extends ResultMerge
 {
+       private static final long serialVersionUID = 1600893100602101732L;
+       
        private ResultMerge _rm = null;
        
        public ResultMergeLocalAutomatic( MatrixObject out, MatrixObject[] in, 
String outputFilename, boolean accum ) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
index 049dbf1..6f987b2 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
@@ -71,6 +71,8 @@ import org.apache.sysml.runtime.util.MapReduceTool;
  */
 public class ResultMergeLocalFile extends ResultMerge
 {
+       private static final long serialVersionUID = -6905893742840020489L;
+
        //NOTE: if we allow simple copies, this might result in a scattered 
file and many MR tasks for subsequent jobs
        public static final boolean ALLOW_COPY_CELLFILES = false;
        
@@ -687,7 +689,7 @@ public class ResultMergeLocalFile extends ResultMerge
                                                        }
                                                        
                                                        //sort sparse due to 
append-only
-                                                       if( appendOnly )
+                                                       if( appendOnly && 
!_isAccum )
                                                                
mb.sortSparseRows();
                                                        
                                                        //change sparsity if 
required after 
@@ -709,7 +711,7 @@ public class ResultMergeLocalFile extends ResultMerge
                                                        }
                                                        
                                                        //sort sparse due to 
append-only
-                                                       if( appendOnly )
+                                                       if( appendOnly && 
_isAccum )
                                                                
mb.sortSparseRows();
                                                        
                                                        //change sparsity if 
required after 
@@ -779,7 +781,7 @@ public class ResultMergeLocalFile extends ResultMerge
                                                        }
                                                        
                                                        //sort sparse and exam 
sparsity due to append-only
-                                                       if( appendOnly )
+                                                       if( appendOnly && 
!_isAccum )
                                                                
mb.sortSparseRows();
                                                        
                                                        //change sparsity if 
required after 
@@ -801,7 +803,7 @@ public class ResultMergeLocalFile extends ResultMerge
                                                        }       
                                                        
                                                        //sort sparse due to 
append-only
-                                                       if( appendOnly )
+                                                       if( appendOnly && 
!_isAccum )
                                                                
mb.sortSparseRows();
                                                        
                                                        //change sparsity if 
required after 
@@ -903,7 +905,7 @@ public class ResultMergeLocalFile extends ResultMerge
                                                        }
                                                        
                                                        //sort sparse due to 
append-only
-                                                       if( appendOnly )
+                                                       if( appendOnly && 
_isAccum )
                                                                
mb.sortSparseRows();
                                                        
                                                        //change sparsity if 
required after 
@@ -925,7 +927,7 @@ public class ResultMergeLocalFile extends ResultMerge
                                                        }
                                                        
                                                        //sort sparse due to 
append-only
-                                                       if( appendOnly )
+                                                       if( appendOnly && 
_isAccum )
                                                                
mb.sortSparseRows();
                                                        
                                                        //change sparsity if 
required after 

http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
index a59e81c..b2f5907 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
@@ -42,6 +42,8 @@ import org.apache.sysml.runtime.util.DataConverter;
  */
 public class ResultMergeLocalMemory extends ResultMerge
 {
+       private static final long serialVersionUID = -3543612508601511701L;
+       
        //internal comparison matrix
        private DenseBlock _compare = null;
        

http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
index 4b1bb0e..c20f0db 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
@@ -58,7 +58,8 @@ import org.apache.sysml.utils.Statistics;
  * 
  */
 public class ResultMergeRemoteMR extends ResultMerge
-{      
+{
+       private static final long serialVersionUID = 575681838941682037L;
        
        public static final byte COMPARE_TAG = 'c';
        public static final byte DATA_TAG = 'd';
@@ -185,11 +186,12 @@ public class ResultMergeRemoteMR extends ResultMerge
                        if( withCompare ) {
                                FileSystem fs = 
IOUtilFunctions.getFileSystem(pathNew, job);
                                pathCompare = new Path(fname).makeQualified(fs);
-                               MRJobConfiguration.setResultMergeInfo(job, 
pathCompare.toString(), ii, 
+                               MRJobConfiguration.setResultMergeInfo(job, 
pathCompare.toString(), _isAccum, ii, 
                                        
LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE), rlen, clen, 
brlen, bclen);
                        }
                        else
-                               MRJobConfiguration.setResultMergeInfo(job, 
"null", ii, LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE), 
rlen, clen, bclen, bclen);
+                               MRJobConfiguration.setResultMergeInfo(job, 
"null", _isAccum, ii,
+                                       
LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE), rlen, clen, 
bclen, bclen);
                        
                        
                        //set mappers, reducers, combiners

http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java
index e12a052..6fa6534 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java
@@ -74,18 +74,17 @@ public class ResultMergeRemoteReducer
                String compareFname = 
MRJobConfiguration.getResultMergeInfoCompareFilename(job);
                
                //determine compare required
-               boolean requiresCompare = false;
-               if( !compareFname.equals("null") )
-                       requiresCompare = true;
+               boolean requiresCompare = !compareFname.equals("null");
+               boolean isAccum = 
MRJobConfiguration.getResultMergeInfoAccumulator(job);
                
                if( ii == InputInfo.TextCellInputInfo )
                        _reducer = new 
ResultMergeReducerTextCell(requiresCompare);
                else if( ii == InputInfo.BinaryCellInputInfo )
                        _reducer = new 
ResultMergeReducerBinaryCell(requiresCompare);
                else if( ii == InputInfo.BinaryBlockInputInfo )
-                       _reducer = new 
ResultMergeReducerBinaryBlock(requiresCompare, job);
+                       _reducer = new 
ResultMergeReducerBinaryBlock(requiresCompare, isAccum, job);
                else
-                       throw new RuntimeException("Unable to configure mapper 
with unknown input info: "+ii.toString());
+                       throw new RuntimeException("Unable to configure mapper 
with unknown input info: "+ii.toString()+" "+isAccum);
        }
 
        @Override
@@ -266,12 +265,15 @@ public class ResultMergeRemoteReducer
        
        private static class ResultMergeReducerBinaryBlock extends ResultMerge 
implements ResultMergeReducer
        {
+               private static final long serialVersionUID = 84399890805869855L;
+               
                private boolean _requiresCompare;
                private JobConf _job = null;
                
-               public ResultMergeReducerBinaryBlock(boolean requiresCompare, 
JobConf job) {
+               public ResultMergeReducerBinaryBlock(boolean requiresCompare, 
boolean isAccum, JobConf job) {
                        _requiresCompare = requiresCompare;
                        _job = job;
+                       _isAccum = isAccum;
                }
                
                @Override
@@ -323,7 +325,7 @@ public class ResultMergeRemoteReducer
                                }
                                
                                //sort sparse due to append-only
-                               if( appendOnly )
+                               if( appendOnly && !_isAccum )
                                        mbOut.sortSparseRows();
                                
                                //change sparsity if required after 

http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
index 804e490..a68c23e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
@@ -43,12 +43,9 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.utils.Statistics;
 
-/**
- * MR job class for submitting parfor result merge MR jobs.
- * 
- */
 public class ResultMergeRemoteSpark extends ResultMerge
-{      
+{
+       private static final long serialVersionUID = -6924566953903424820L;
        
        private ExecutionContext _ec = null;
        private int  _numMappers = -1;
@@ -162,23 +159,24 @@ public class ResultMergeRemoteSpark extends ResultMerge
                        
                        //Step 2a: merge with compare
                        JavaPairRDD<MatrixIndexes, MatrixBlock> out = null;
-                       if( withCompare )
-                       {
+                       if( withCompare ) {
                                JavaPairRDD<MatrixIndexes, MatrixBlock> 
compareRdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
                                                
sec.getRDDHandleForMatrixObject(compare, InputInfo.BinaryBlockInputInfo);
-                       
+                               
                                //merge values which differ from compare values
-                               ResultMergeRemoteSparkWCompare cfun = new 
ResultMergeRemoteSparkWCompare();
+                               ResultMergeRemoteSparkWCompare cfun = new 
ResultMergeRemoteSparkWCompare(_isAccum);
                                out = rdd.groupByKey(numRed) //group all result 
blocks per key
-                                       .join(compareRdd)   //join compare 
block and result blocks 
-                                       .mapToPair(cfun);   //merge result 
blocks w/ compare
+                                       .join(compareRdd)        //join compare 
block and result blocks 
+                                       .mapToPair(cfun);        //merge result 
blocks w/ compare
                        }
                        //Step 2b: merge without compare
                        else {
                                //direct merge in any order (disjointness 
guaranteed)
-                               out = RDDAggregateUtils.mergeByKey(rdd, false);
+                               out = _isAccum ?
+                                       RDDAggregateUtils.sumByKeyStable(rdd, 
false) :
+                                       RDDAggregateUtils.mergeByKey(rdd, 
false);
                        }
-                   
+                       
                        //Step 3: create output rdd handle w/ lineage
                        ret = new RDDObject(out);
                        for(int i=0; i<paths.length; i++)
@@ -188,7 +186,7 @@ public class ResultMergeRemoteSpark extends ResultMerge
                }
                catch( Exception ex ) {
                        throw new DMLRuntimeException(ex);
-               }           
+               }
                
                //maintain statistics
                Statistics.incrementNoOfCompiledSPInst();
@@ -203,9 +201,7 @@ public class ResultMergeRemoteSpark extends ResultMerge
        private static int determineNumReducers(long rlen, long clen, int 
brlen, int bclen, long numRed) {
                //set the number of mappers and reducers 
                long reducerGroups = Math.max(rlen/brlen,1) * 
Math.max(clen/bclen, 1);
-               int ret = (int)Math.min( numRed, reducerGroups );
-               
-               return ret;     
+               return (int)Math.min( numRed, reducerGroups );
        }
        
        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
index 27cc894..9a755b2 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
@@ -36,6 +36,10 @@ public class ResultMergeRemoteSparkWCompare extends 
ResultMerge implements PairF
 {
        private static final long serialVersionUID = -5970805069405942836L;
        
+       public ResultMergeRemoteSparkWCompare(boolean accum) {
+               _isAccum = accum;
+       }
+       
        @Override
        public Tuple2<MatrixIndexes, MatrixBlock> call(Tuple2<MatrixIndexes, 
Tuple2<Iterable<MatrixBlock>, MatrixBlock>> arg)
                throws Exception 

http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
index 378d1ef..1c233fe 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
@@ -160,6 +160,7 @@ public class MRJobConfiguration
        //result merge info
        private static final String 
RESULTMERGE_INPUT_INFO_CONFIG="resultmerge.input.inputinfo";
        private static final String 
RESULTMERGE_COMPARE_FILENAME_CONFIG="resultmerge.compare.filename";
+       private static final String 
RESULTMERGE_ACCUMULATOR_CONFIG="resultmerge.accumulator";
        private static final String 
RESULTMERGE_STAGING_DIR_CONFIG="resultmerge.staging.dir";
        private static final String 
RESULTMERGE_MATRIX_NUM_ROW_CONFIG="resultmerge.matrix.num.row";
        private static final String 
RESULTMERGE_MATRIX_NUM_COLUMN_CONFIG="resultmerge.matrix.num.column";
@@ -632,10 +633,11 @@ public class MRJobConfiguration
                return job.getBoolean(PARTITIONING_TRANSPOSE_COL_CONFIG, false);
        }
        
-       public static void setResultMergeInfo( JobConf job, String fnameNew, 
InputInfo ii, String stagingDir, long rlen, long clen, int brlen, int bclen )
+       public static void setResultMergeInfo( JobConf job, String fnameNew, 
boolean accum, InputInfo ii, String stagingDir, long rlen, long clen, int 
brlen, int bclen )
                throws DMLRuntimeException
        {
                job.set(RESULTMERGE_COMPARE_FILENAME_CONFIG, fnameNew);
+               job.set(RESULTMERGE_ACCUMULATOR_CONFIG, String.valueOf(accum));
                job.set(RESULTMERGE_INPUT_INFO_CONFIG, 
InputInfo.inputInfoToString(ii));
                job.set(RESULTMERGE_STAGING_DIR_CONFIG, stagingDir);
                job.set(RESULTMERGE_MATRIX_NUM_ROW_CONFIG, 
String.valueOf(rlen));
@@ -644,11 +646,14 @@ public class MRJobConfiguration
                job.set(RESULTMERGE_BLOCK_NUM_COLUMN_CONFIG, 
String.valueOf(bclen));
        }
        
-       public static String getResultMergeInfoCompareFilename( JobConf job )
-       {
+       public static String getResultMergeInfoCompareFilename( JobConf job ) {
                return job.get(RESULTMERGE_COMPARE_FILENAME_CONFIG);
        }
        
+       public static boolean getResultMergeInfoAccumulator( JobConf job ) {
+               return 
Boolean.parseBoolean(job.get(RESULTMERGE_ACCUMULATOR_CONFIG));
+       }
+       
        public static InputInfo getResultMergeInputInfo( JobConf job )
        {
                return InputInfo.stringToInputInfo( 
job.get(RESULTMERGE_INPUT_INFO_CONFIG) );

http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAccumulatorResultMergeTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAccumulatorResultMergeTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAccumulatorResultMergeTest.java
index 7936c26..3ce5f73 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAccumulatorResultMergeTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAccumulatorResultMergeTest.java
@@ -70,6 +70,46 @@ public class ParForAccumulatorResultMergeTest extends 
AutomatedTestBase
        public void testParForAccumulatorLocalInitSparse() {
                runParForAccumulatorResultMergeTest(TEST_NAME1, true, true, 
ExecType.CP);
        }
+       
+       @Test
+       public void testParForAccumulatorRemoteEmptyDenseMR() {
+               runParForAccumulatorResultMergeTest(TEST_NAME2, false, false, 
ExecType.MR);
+       }
+       
+       @Test
+       public void testParForAccumulatorRemoteEmptySparseMR() {
+               runParForAccumulatorResultMergeTest(TEST_NAME2, false, true, 
ExecType.MR);
+       }
+       
+       @Test
+       public void testParForAccumulatorRemoteInitDenseMR() {
+               runParForAccumulatorResultMergeTest(TEST_NAME2, true, false, 
ExecType.MR);
+       }
+       
+       @Test
+       public void testParForAccumulatorRemoteInitSparseMR() {
+               runParForAccumulatorResultMergeTest(TEST_NAME2, true, true, 
ExecType.MR);
+       }
+       
+       @Test
+       public void testParForAccumulatorRemoteEmptyDenseSP() {
+               runParForAccumulatorResultMergeTest(TEST_NAME3, false, false, 
ExecType.SPARK);
+       }
+       
+       @Test
+       public void testParForAccumulatorRemoteEmptySparseSP() {
+               runParForAccumulatorResultMergeTest(TEST_NAME3, false, true, 
ExecType.SPARK);
+       }
+       
+       @Test
+       public void testParForAccumulatorRemoteInitDenseSP() {
+               runParForAccumulatorResultMergeTest(TEST_NAME3, true, false, 
ExecType.SPARK);
+       }
+       
+       @Test
+       public void testParForAccumulatorRemoteInitSparseSP() {
+               runParForAccumulatorResultMergeTest(TEST_NAME3, true, true, 
ExecType.SPARK);
+       }
 
        private void runParForAccumulatorResultMergeTest( String test, boolean 
init, boolean sparse, ExecType et )
        {
@@ -96,16 +136,15 @@ public class ParForAccumulatorResultMergeTest extends 
AutomatedTestBase
                        programArgs = new String[]{"-args", 
                                String.valueOf(rows), String.valueOf(cols), 
String.valueOf(init).toUpperCase(),
                                String.valueOf(sparse).toUpperCase(), 
output("R") };
-                       
-                       fullRScriptName = HOME + TEST_NAME + ".R";
+                       fullRScriptName = HOME + TEST_NAME.substring(0, 
TEST_NAME.length()-1) + ".R";
                        rCmd = "Rscript" + " " + fullRScriptName + " " + 
                                String.valueOf(rows) + " " + 
String.valueOf(cols) + " " + String.valueOf(init).toUpperCase() 
                                        + " " + 
String.valueOf(sparse).toUpperCase() + " " + expectedDir();
-       
+                       
                        //run tests
                        runTest(true, false, null, -1);
                        runRScript(true);
-               
+                       
                        //compare matrices
                        HashMap<CellIndex, Double> dmlfile = 
readDMLMatrixFromHDFS("R");
                        HashMap<CellIndex, Double> rfile  = 
readRMatrixFromFS("R");

http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/test/scripts/functions/parfor/parfor_accumulator.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/parfor/parfor_accumulator.R 
b/src/test/scripts/functions/parfor/parfor_accumulator.R
new file mode 100644
index 0000000..a1c587c
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_accumulator.R
@@ -0,0 +1,39 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+args <- commandArgs(TRUE)
+options(digits=22)
+
+library("Matrix")
+
+rlen = as.integer(args[1]);
+clen = as.integer(args[2]);
+
+R = matrix(ifelse(as.logical(args[3]), 7, 0), rlen, clen);
+if( as.logical(args[4]) ) {
+   R[,50:300] = matrix(0, rlen, 251);
+}
+for(i in 1:10) {
+   R = R + matrix(i, rlen, clen);
+}
+
+writeMM(as(R, "CsparseMatrix"), paste(args[5], "R", sep=""));

http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/test/scripts/functions/parfor/parfor_accumulator1.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/parfor/parfor_accumulator1.R 
b/src/test/scripts/functions/parfor/parfor_accumulator1.R
deleted file mode 100644
index a1c587c..0000000
--- a/src/test/scripts/functions/parfor/parfor_accumulator1.R
+++ /dev/null
@@ -1,39 +0,0 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-
-args <- commandArgs(TRUE)
-options(digits=22)
-
-library("Matrix")
-
-rlen = as.integer(args[1]);
-clen = as.integer(args[2]);
-
-R = matrix(ifelse(as.logical(args[3]), 7, 0), rlen, clen);
-if( as.logical(args[4]) ) {
-   R[,50:300] = matrix(0, rlen, 251);
-}
-for(i in 1:10) {
-   R = R + matrix(i, rlen, clen);
-}
-
-writeMM(as(R, "CsparseMatrix"), paste(args[5], "R", sep=""));

http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/test/scripts/functions/parfor/parfor_accumulator2.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/parfor/parfor_accumulator2.dml 
b/src/test/scripts/functions/parfor/parfor_accumulator2.dml
new file mode 100644
index 0000000..e81a4cc
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_accumulator2.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+rlen = $1;
+clen = $2;
+init = $3
+sparse = $4;
+
+R = matrix(ifelse(init, 7, 0), rlen, clen);
+if( sparse )
+   R[,50:300] = matrix(0, rlen, 251);
+
+parfor(i in 1:10, opt=CONSTRAINED, resultmerge=REMOTE_MR)
+   R += matrix(i, rlen, clen);
+
+write(R, $5);

http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/test/scripts/functions/parfor/parfor_accumulator3.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/parfor/parfor_accumulator3.dml 
b/src/test/scripts/functions/parfor/parfor_accumulator3.dml
new file mode 100644
index 0000000..7418917
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_accumulator3.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+rlen = $1;
+clen = $2;
+init = $3
+sparse = $4;
+
+R = matrix(ifelse(init, 7, 0), rlen, clen);
+if( sparse )
+   R[,50:300] = matrix(0, rlen, 251);
+
+parfor(i in 1:10, opt=CONSTRAINED, resultmerge=REMOTE_SPARK)
+   R += matrix(i, rlen, clen);
+
+write(R, $5);

Reply via email to