Repository: systemml
Updated Branches:
  refs/heads/master c2dd05e51 -> 00d72a092


[SYSTEMML-2095] Extended local parfor result merge w/ accumulators

This patch realizes the runtime integration of accumulator result
variables and extends all local result merge implementations
accordingly. For += result merge w/o compare, we forward the core
operation to inplace binary plus operations, while for result merge w/
compare, we extend the existing implementation. Specifically, we add
currentValue + (newValue - oldValue) to avoid double counting.
Furthermore, this also includes new tests for parfor with different
types of += result merge.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/47498514
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/47498514
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/47498514

Branch: refs/heads/master
Commit: 474985143589e363aa58d215fce6abdc8d721bd0
Parents: c2dd05e
Author: Matthias Boehm <[email protected]>
Authored: Sat Jan 27 21:49:46 2018 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Sat Jan 27 21:49:46 2018 -0800

----------------------------------------------------------------------
 .../sysml/parser/ParForStatementBlock.java      |  67 +++++++----
 .../controlprogram/ParForProgramBlock.java      |  61 +++++-----
 .../controlprogram/parfor/ParForBody.java       |  45 +++----
 .../controlprogram/parfor/ParWorker.java        |  42 +++----
 .../controlprogram/parfor/ProgramConverter.java |  40 +++++--
 .../parfor/RemoteDPParForSparkWorker.java       |   2 +-
 .../parfor/RemoteDPParWorkerReducer.java        |  10 +-
 .../parfor/RemoteParForSparkWorker.java         |   6 +-
 .../parfor/RemoteParForUtils.java               |  37 +++---
 .../parfor/RemoteParWorkerMapper.java           |   6 +-
 .../controlprogram/parfor/ResultMerge.java      |  45 ++++---
 .../parfor/ResultMergeLocalAutomatic.java       |  12 +-
 .../parfor/ResultMergeLocalFile.java            |  13 +-
 .../parfor/ResultMergeLocalMemory.java          |   4 +-
 .../parfor/ResultMergeRemoteMR.java             |  10 +-
 .../parfor/ResultMergeRemoteSpark.java          |  21 ++--
 .../parfor/opt/OptimizerConstrained.java        |   5 +-
 .../parfor/opt/OptimizerRuleBased.java          |  74 ++++++------
 .../runtime/instructions/InstructionUtils.java  |   3 +-
 .../sysml/runtime/matrix/data/MatrixBlock.java  |   6 -
 .../ParForAccumulatorResultMergeTest.java       | 119 +++++++++++++++++++
 .../functions/parfor/parfor_accumulator1.R      |  39 ++++++
 .../functions/parfor/parfor_accumulator1.dml    |  34 ++++++
 .../functions/parfor/ZPackageSuite.java         |   1 +
 24 files changed, 449 insertions(+), 253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java 
b/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java
index 69027a5..6a39c7a 100644
--- a/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java
+++ b/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java
@@ -93,9 +93,9 @@ public class ParForStatementBlock extends ForStatementBlock
        
        //instance members
        private final long _ID;
-       private VariableSet       _vsParent   = null;
-       private ArrayList<String> _resultVars = null;
-       private Bounds            _bounds     = null;
+       private VariableSet          _vsParent   = null;
+       private ArrayList<ResultVar> _resultVars = null;
+       private Bounds               _bounds     = null;
        
        static
        {
@@ -159,11 +159,15 @@ public class ParForStatementBlock extends 
ForStatementBlock
                return _ID;
        }
 
-       public ArrayList<String> getResultVariables() {
+       public ArrayList<ResultVar> getResultVariables() {
                return _resultVars;
        }
        
-       private void addToResultVariablesNoDup( String var ) {
+       private void addToResultVariablesNoDup( String var, boolean accum ) {
+               addToResultVariablesNoDup(new ResultVar(var, accum));
+       }
+       
+       private void addToResultVariablesNoDup( ResultVar var ) {
                if( !_resultVars.contains( var ) )
                        _resultVars.add( var );
        }
@@ -344,16 +348,16 @@ public class ParForStatementBlock extends 
ForStatementBlock
                //a) add own candidates
                for( Candidate var : C )
                        if( check || var._dat.getDataType()!=DataType.SCALAR )
-                               addToResultVariablesNoDup( var._var );
+                               addToResultVariablesNoDup( var._var, 
var._isAccum );
                //b) get and add child result vars (if required)
-               ArrayList<String> tmp = new ArrayList<>();
+               ArrayList<ResultVar> tmp = new ArrayList<>();
                rConsolidateResultVars(pfs.getBody(), tmp);
-               for( String var : tmp )
-                       if(_vsParent.containsVariable(var))
-                               addToResultVariablesNoDup( var );
+               for( ResultVar var : tmp )
+                       if(_vsParent.containsVariable(var._name))
+                               addToResultVariablesNoDup(var);
                if( LDEBUG )
-                       for( String rvar : _resultVars )
-                               LOG.debug("INFO: PARFOR final result variable: 
"+rvar);
+                       for( ResultVar rvar : _resultVars )
+                               LOG.debug("INFO: PARFOR final result variable: 
"+rvar._name);
                
                //cleanup function cache in order to prevent side effects 
between parfor statements
                if( USE_FN_CACHE )
@@ -594,35 +598,25 @@ public class ParForStatementBlock extends 
ForStatementBlock
                        && l2.eval(1L) == l1._b[0] );                 //aligned 
intercept
        }
        
-       private void rConsolidateResultVars(ArrayList<StatementBlock> asb, 
ArrayList<String> vars) 
+       private void rConsolidateResultVars(ArrayList<StatementBlock> asb, 
ArrayList<ResultVar> vars) 
                throws LanguageException 
        {
                for(StatementBlock sb : asb ) // foreach statementblock in 
parforbody
                {
                        if( sb instanceof ParForStatementBlock )
-                       {
                                
vars.addAll(((ParForStatementBlock)sb).getResultVariables());
-                       }
                        
-                       for( Statement s : sb._statements ) // foreach 
statement in statement block
-                       {
+                       for( Statement s : sb._statements ) {
                                if( s instanceof ForStatement || s instanceof 
ParForStatement )
-                               {
                                        
rConsolidateResultVars(((ForStatement)s).getBody(), vars);
-                               }
                                else if( s instanceof WhileStatement ) 
-                               {
                                        
rConsolidateResultVars(((WhileStatement)s).getBody(), vars);
-                               }
-                               else if( s instanceof IfStatement ) 
-                               {
+                               else if( s instanceof IfStatement ) {
                                        
rConsolidateResultVars(((IfStatement)s).getIfBody(), vars);
                                        
rConsolidateResultVars(((IfStatement)s).getElseBody(), vars);
                                }
                                else if( s instanceof FunctionStatement ) 
-                               {
                                        
rConsolidateResultVars(((FunctionStatement)s).getBody(), vars);
-                               }
                        }
                }
        }
@@ -1809,6 +1803,29 @@ public class ParForStatementBlock extends 
ForStatementBlock
                return ret;
        }
        
+       public static class ResultVar {
+               public final String _name;
+               public final boolean _isAccum;
+               public ResultVar(String name, boolean accum) {
+                       _name = name;
+                       _isAccum = accum;
+               }
+               @Override
+               public boolean equals(Object that) {
+                       if( !(that instanceof ResultVar) )
+                               return false;
+                       return _name.equals(((ResultVar)that)._name);
+               }
+               @Override
+               public int hashCode() {
+                       return _name.hashCode();
+               }
+               @Override
+               public String toString() {
+                       return _name;
+               }
+       }
+       
        private static class Candidate  {
                private final String _var;          // variable name
                private final DataIdentifier _dat;  // _var data identifier

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index 88ce415..7348266 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -49,6 +50,7 @@ import org.apache.sysml.parser.DataIdentifier;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.parser.ParForStatementBlock;
+import org.apache.sysml.parser.ParForStatementBlock.ResultVar;
 import org.apache.sysml.parser.StatementBlock;
 import org.apache.sysml.parser.VariableSet;
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -355,7 +357,7 @@ public class ParForProgramBlock extends ForProgramBlock
        protected Collection<String> _variablesECache = null;
        
        // program block meta data
-       protected final ArrayList<String> _resultVars;
+       protected final ArrayList<ResultVar> _resultVars;
        protected final IDSequence _resultVarsIDSeq;
        protected final IDSequence _dpVarsIDSeq;
        protected final boolean _hasFunctions;
@@ -368,7 +370,7 @@ public class ParForProgramBlock extends ForProgramBlock
        protected HashMap<Long,ArrayList<ProgramBlock>> _pbcache = null;
        protected long[] _pwIDs = null;
        
-       public ParForProgramBlock(Program prog, String iterPredVar, 
HashMap<String,String> params, ArrayList<String> resultVars)
+       public ParForProgramBlock(Program prog, String iterPredVar, 
HashMap<String,String> params, ArrayList<ResultVar> resultVars)
                throws DMLRuntimeException 
        {
                this( -1, prog, iterPredVar, params, resultVars);
@@ -385,7 +387,7 @@ public class ParForProgramBlock extends ForProgramBlock
         * @param resultVars list of result variable names
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public ParForProgramBlock(int ID, Program prog, String iterPredVar, 
HashMap<String,String> params, ArrayList<String> resultVars) 
+       public ParForProgramBlock(int ID, Program prog, String iterPredVar, 
HashMap<String,String> params, ArrayList<ResultVar> resultVars) 
        {
                super(prog, iterPredVar);
 
@@ -457,7 +459,7 @@ public class ParForProgramBlock extends ForProgramBlock
                        UtilFunctions.unquote(tmp).toUpperCase();
        }
 
-       public ArrayList<String> getResultVariables() {
+       public ArrayList<ResultVar> getResultVariables() {
                return _resultVars;
        }
        
@@ -810,7 +812,8 @@ public class ParForProgramBlock extends ForProgramBlock
                        LocalVariableMap [] localVariables = new 
LocalVariableMap [_numThreads]; 
                        for( int i=0; i<_numThreads; i++ ) {
                                localVariables[i] = workers[i].getVariables();
-                               localVariables[i].removeAllNotIn(new 
HashSet<>(_resultVars));
+                               
localVariables[i].removeAllNotIn(_resultVars.stream()
+                                       .map(v -> 
v._name).collect(Collectors.toSet()));
                                numExecutedTasks += 
workers[i].getExecutedTasks();
                                numExecutedIterations += 
workers[i].getExecutedIterations();
                        }
@@ -1489,7 +1492,7 @@ public class ParForProgramBlock extends ForProgramBlock
                return dp;
        }
 
-       private ResultMerge createResultMerge( PResultMerge prm, MatrixObject 
out, MatrixObject[] in, String fname, ExecutionContext ec ) 
+       private ResultMerge createResultMerge( PResultMerge prm, MatrixObject 
out, MatrixObject[] in, String fname, boolean accum, ExecutionContext ec ) 
                throws DMLRuntimeException 
        {
                ResultMerge rm = null;
@@ -1518,21 +1521,22 @@ public class ParForProgramBlock extends ForProgramBlock
                switch( prm )
                {
                        case LOCAL_MEM:
-                               rm = new ResultMergeLocalMemory( out, in, fname 
);
+                               rm = new ResultMergeLocalMemory( out, in, 
fname, accum );
                                break;
                        case LOCAL_FILE:
-                               rm = new ResultMergeLocalFile( out, in, fname );
+                               rm = new ResultMergeLocalFile( out, in, fname, 
accum );
                                break;
                        case LOCAL_AUTOMATIC:
-                               rm = new ResultMergeLocalAutomatic( out, in, 
fname );
+                               rm = new ResultMergeLocalAutomatic( out, in, 
fname, accum );
                                break;
                        case REMOTE_MR:
-                               rm = new ResultMergeRemoteMR( out, in, fname, 
+                               rm = new ResultMergeRemoteMR( out, in, fname, 
accum, 
                                        _ID, numMap, numRed, 
WRITE_REPLICATION_FACTOR,
                                        MAX_RETRYS_ON_ERROR, 
ALLOW_REUSE_MR_JVMS );
                                break;
                        case REMOTE_SPARK:
-                               rm = new ResultMergeRemoteSpark( out, in, 
fname, ec, numMap, numRed );
+                               rm = new ResultMergeRemoteSpark( out, in,
+                                       fname, accum, ec, numMap, numRed );
                                break;
                                
                        default:
@@ -1657,10 +1661,11 @@ public class ParForProgramBlock extends ForProgramBlock
                        try
                        {
                                //enqueue all result vars as tasks
-                               LocalTaskQueue<String> q = new 
LocalTaskQueue<>();
-                               for( String var : _resultVars ) //foreach 
non-local write
-                                       if( ec.getVariable(var) instanceof 
MatrixObject ) //robustness scalars
+                               LocalTaskQueue<ResultVar> q = new 
LocalTaskQueue<>();
+                               for( ResultVar var : _resultVars ) { //foreach 
non-local write
+                                       if( ec.getVariable(var._name) 
instanceof MatrixObject ) //robustness scalars
                                                q.enqueueTask(var);
+                               }
                                q.closeInput();
                                
                                //run result merge workers
@@ -1683,17 +1688,17 @@ public class ParForProgramBlock extends ForProgramBlock
                else
                {
                        //execute result merge sequentially for all result vars
-                       for( String var : _resultVars ) //foreach non-local 
write
+                       for( ResultVar var : _resultVars ) //foreach non-local 
write
                        {
-                               Data dat = ec.getVariable(var);
+                               Data dat = ec.getVariable(var._name);
                                if( dat instanceof MatrixObject ) //robustness 
scalars
                                {
                                        MatrixObject out = (MatrixObject) dat;
                                        MatrixObject[] in = new MatrixObject[ 
results.length ];
                                        for( int i=0; i< results.length; i++ )
-                                               in[i] = (MatrixObject) 
results[i].get( var );
+                                               in[i] = (MatrixObject) 
results[i].get( var._name );
                                        String fname = 
constructResultMergeFileName();
-                                       ResultMerge rm = 
createResultMerge(_resultMerge, out, in, fname, ec);
+                                       ResultMerge rm = 
createResultMerge(_resultMerge, out, in, fname, var._isAccum, ec);
                                        MatrixObject outNew = null;
                                        if( USE_PARALLEL_RESULT_MERGE )
                                                outNew = 
rm.executeParallelMerge( _numThreads );
@@ -1701,7 +1706,7 @@ public class ParForProgramBlock extends ForProgramBlock
                                                outNew = 
rm.executeSerialMerge();
                                        
                                        //cleanup existing var
-                                       Data exdata = ec.removeVariable(var);
+                                       Data exdata = 
ec.removeVariable(var._name);
                                        if( exdata != null && exdata != outNew 
&& exdata instanceof MatrixObject )
                                                
ec.cleanupCacheableData((MatrixObject)exdata);
                                        
@@ -1709,7 +1714,7 @@ public class ParForProgramBlock extends ForProgramBlock
                                        cleanWorkerResultVariables( ec, out, in 
);
                                        
                                        //set merged result variable
-                                       ec.setVariable(var, outNew);
+                                       ec.setVariable(var._name, outNew);
                                }
                        }
                }
@@ -1907,12 +1912,12 @@ public class ParForProgramBlock extends ForProgramBlock
         */
        private class ResultMergeWorker extends Thread
        {
-               private LocalTaskQueue<String> _q = null;
+               private LocalTaskQueue<ResultVar> _q = null;
                private LocalVariableMap[] _refVars = null;
                private ExecutionContext _ec = null;
                private boolean _success = false;
                
-               public ResultMergeWorker( LocalTaskQueue<String> q, 
LocalVariableMap[] results, ExecutionContext ec )
+               public ResultMergeWorker( LocalTaskQueue<ResultVar> q, 
LocalVariableMap[] results, ExecutionContext ec )
                {
                        _q = q;
                        _refVars = results;
@@ -1930,21 +1935,21 @@ public class ParForProgramBlock extends ForProgramBlock
                        {
                                while( true ) 
                                {
-                                       String varname = _q.dequeueTask();
-                                       if( varname == 
LocalTaskQueue.NO_MORE_TASKS ) // task queue closed (no more tasks)
+                                       ResultVar var = _q.dequeueTask();
+                                       if( var == LocalTaskQueue.NO_MORE_TASKS 
) // task queue closed (no more tasks)
                                                break;
                                
                                        MatrixObject out = null;
                                        synchronized( _ec.getVariables() ){
-                                               out = 
_ec.getMatrixObject(varname);
+                                               out = 
_ec.getMatrixObject(var._name);
                                        }
                                        
                                        MatrixObject[] in = new MatrixObject[ 
_refVars.length ];
                                        for( int i=0; i< _refVars.length; i++ )
-                                               in[i] = (MatrixObject) 
_refVars[i].get( varname ); 
+                                               in[i] = (MatrixObject) 
_refVars[i].get( var._name ); 
                                        String fname = 
constructResultMergeFileName();
                                
-                                       ResultMerge rm = 
createResultMerge(_resultMerge, out, in, fname, _ec);
+                                       ResultMerge rm = 
createResultMerge(_resultMerge, out, in, fname, var._isAccum, _ec);
                                        MatrixObject outNew = null;
                                        if( USE_PARALLEL_RESULT_MERGE )
                                                outNew = 
rm.executeParallelMerge( _numThreads );
@@ -1952,7 +1957,7 @@ public class ParForProgramBlock extends ForProgramBlock
                                                outNew = 
rm.executeSerialMerge();
                                        
                                        synchronized( _ec.getVariables() ){
-                                               _ec.getVariables().put( 
varname, outNew);
+                                               _ec.getVariables().put( 
var._name, outNew);
                                        }
                
                                        //cleanup of intermediate result 
variables

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParForBody.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParForBody.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParForBody.java
index 2d78895..3ff9e74 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParForBody.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParForBody.java
@@ -21,6 +21,7 @@ package org.apache.sysml.runtime.controlprogram.parfor;
 
 import java.util.ArrayList;
 
+import org.apache.sysml.parser.ParForStatementBlock.ResultVar;
 import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysml.runtime.controlprogram.ProgramBlock;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
@@ -31,57 +32,45 @@ import 
org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
  */
 public class ParForBody 
 {
-
-       
-       private ArrayList<String>       _resultVarNames;
+       private ArrayList<ResultVar>    _resultVars;
        private ArrayList<ProgramBlock> _childBlocks;
-       private ExecutionContext                _ec;
+       private ExecutionContext        _ec;
        
-       public ParForBody()
-       {
-               
-       }
+       public ParForBody() {}
 
        public ParForBody( ArrayList<ProgramBlock> childBlocks, 
-                       ArrayList<String> resultVarNames, ExecutionContext ec ) 
+                       ArrayList<ResultVar> resultVars, ExecutionContext ec ) 
        {
-               _resultVarNames = resultVarNames;
-               _childBlocks    = childBlocks;
-               _ec             = ec;
+               _resultVars  = resultVars;
+               _childBlocks = childBlocks;
+               _ec          = ec;
        }
 
-       public LocalVariableMap getVariables() 
-       {
+       public LocalVariableMap getVariables() {
                return _ec.getVariables();
        }
 
-       public ArrayList<String> getResultVarNames() 
-       {
-               return _resultVarNames;
+       public ArrayList<ResultVar> getResultVariables() {
+               return _resultVars;
        }
 
-       public void setResultVarNames(ArrayList<String> resultVarNames) 
-       {
-               _resultVarNames = resultVarNames;
+       public void setResultVariables(ArrayList<ResultVar> resultVars) {
+               _resultVars = resultVars;
        }
 
-       public ArrayList<ProgramBlock> getChildBlocks() 
-       {
+       public ArrayList<ProgramBlock> getChildBlocks() {
                return _childBlocks;
        }
 
-       public void setChildBlocks(ArrayList<ProgramBlock> childBlocks) 
-       {
+       public void setChildBlocks(ArrayList<ProgramBlock> childBlocks) {
                _childBlocks = childBlocks;
        }
 
-       public ExecutionContext getEc() 
-       {
+       public ExecutionContext getEc() {
                return _ec;
        }
 
-       public void setEc(ExecutionContext ec) 
-       {
+       public void setEc(ExecutionContext ec) {
                _ec = ec;
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java
index 281ce07..cf5cbcf 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java
@@ -24,7 +24,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
+import org.apache.sysml.parser.ParForStatementBlock.ResultVar;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysml.runtime.controlprogram.ProgramBlock;
@@ -43,11 +43,9 @@ import org.apache.sysml.runtime.instructions.cp.IntObject;
  */
 public abstract class ParWorker
 {
-       
        protected static final Log LOG = 
LogFactory.getLog(ParWorker.class.getName());
        
        protected long                      _workerID    = -1;
-       
        protected ArrayList<ProgramBlock>   _childBlocks = null;
 
        public ExecutionContext getExecutionContext() {
@@ -55,34 +53,28 @@ public abstract class ParWorker
        }
 
        protected ExecutionContext          _ec          = null;
-       protected ArrayList<String>         _resultVars  = null;
+       protected ArrayList<ResultVar>      _resultVars  = null;
 
        protected boolean                   _monitor     = false;
        
        protected long                      _numTasks    = -1;
        protected long                      _numIters    = -1;
        
-       public ParWorker()
-       {
+       public ParWorker() {
                //implicit constructor (required if parameters not known on 
object creation, 
                //e.g., RemoteParWorkerMapper)
        }
        
-       public ParWorker( long ID, ParForBody body, boolean monitor )
-       {
+       public ParWorker( long ID, ParForBody body, boolean monitor ) {
                _workerID    = ID;
-               
-               if( body != null )
-               {
+               if( body != null ) {
                        _childBlocks = body.getChildBlocks();
-                       _ec          = body.getEc();
-                       _resultVars  = body.getResultVarNames();
+                       _ec = body.getEc();
+                       _resultVars = body.getResultVariables();
                }
-               
                _monitor = monitor;
-               
-               _numTasks    = 0;
-               _numIters    = 0;
+               _numTasks = 0;
+               _numIters = 0;
        }
 
        public LocalVariableMap getVariables()
@@ -107,21 +99,15 @@ public abstract class ParWorker
         * 
         * @return number of executed iterations
         */
-       public long getExecutedIterations()
-       {
+       public long getExecutedIterations() {
                return _numIters;
        }
 
-       protected void pinResultVariables()
-       {
-               for( String var : _resultVars )
-               {
-                       Data dat = _ec.getVariable(var);
+       protected void pinResultVariables() {
+               for( ResultVar var : _resultVars ) {
+                       Data dat = _ec.getVariable(var._name);
                        if( dat instanceof MatrixObject )
-                       {
-                               MatrixObject mo = (MatrixObject)dat;
-                               mo.enableCleanup(false); 
-                       }
+                               ((MatrixObject)dat).enableCleanup(false);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
index ea3deb5..d5cd518 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
@@ -45,6 +45,7 @@ import org.apache.sysml.parser.ParForStatementBlock;
 import org.apache.sysml.parser.StatementBlock;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.parser.ParForStatementBlock.ResultVar;
 import org.apache.sysml.parser.WhileStatementBlock;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.codegen.CodegenUtils;
@@ -726,7 +727,7 @@ public class ProgramConverter
                throws DMLRuntimeException
        {
                ArrayList<ProgramBlock> pbs = body.getChildBlocks();
-               ArrayList<String> rVnames = body.getResultVarNames();
+               ArrayList<ResultVar> rVnames = body.getResultVariables();
                ExecutionContext ec = body.getEc();
                
                if( pbs.isEmpty() )
@@ -741,7 +742,7 @@ public class ProgramConverter
                //handle DMLScript UUID (propagate original uuid for writing to 
scratch space)
                sb.append( DMLScript.getUUID() );
                sb.append( COMPONENTS_DELIM );
-               sb.append( NEWLINE );           
+               sb.append( NEWLINE );
                
                //handle DML config
                sb.append( 
ConfigurationManager.getDMLConfig().serializeDMLConfig() );
@@ -763,7 +764,7 @@ public class ProgramConverter
                sb.append( NEWLINE );
                
                //handle result variable names
-               sb.append( serializeStringArrayList(rVnames) );
+               sb.append( serializeResultVariables(rVnames) );
                sb.append( COMPONENTS_DELIM );
                
                //handle execution context
@@ -1024,6 +1025,18 @@ public class ProgramConverter
                return sb.toString();
        }
 
+       public static String serializeResultVariables( ArrayList<ResultVar> 
vars) {
+               StringBuilder sb = new StringBuilder();
+               int count=0;
+               for( ResultVar var : vars ) {
+                       if(count>0)
+                               sb.append( ELEMENT_DELIM );
+                       sb.append( var._isAccum ? var._name+"+" : var._name );
+                       count++;
+               }
+               return sb.toString();
+       }
+       
        public static String serializeStringArrayList( ArrayList<String> vars)
        {
                StringBuilder sb = new StringBuilder();
@@ -1179,7 +1192,7 @@ public class ProgramConverter
                        
                        sb.append( pfpb.getIterVar() );
                        sb.append( COMPONENTS_DELIM );
-                       sb.append( serializeStringArrayList( 
pfpb.getResultVariables()) );
+                       sb.append( serializeResultVariables( 
pfpb.getResultVariables()) );
                        sb.append( COMPONENTS_DELIM );
                        sb.append( serializeStringHashMap( 
pfpb.getParForParams()) ); //parameters of nested parfor
                        sb.append( COMPONENTS_DELIM );
@@ -1323,13 +1336,13 @@ public class ProgramConverter
                
                //handle result variable names
                String rvarStr = st.nextToken();
-               ArrayList<String> rvars = parseStringArrayList(rvarStr);
-               body.setResultVarNames(rvars);
+               ArrayList<ResultVar> rvars = parseResultVariables(rvarStr);
+               body.setResultVariables(rvars);
                
                //handle execution context
                String ecStr = st.nextToken();
                ExecutionContext ec = parseExecutionContext( ecStr, prog );
-                       
+               
                //handle program blocks
                String spbs = st.nextToken();
                ArrayList<ProgramBlock> pbs = rParseProgramBlocks(spbs, prog, 
id);
@@ -1337,7 +1350,7 @@ public class ProgramConverter
                body.setChildBlocks( pbs );
                body.setEc( ec );
                
-               return body;            
+               return body;
        }
 
        public static Program parseProgram( String in, int id ) 
@@ -1499,7 +1512,7 @@ public class ProgramConverter
                
                //inputs
                String iterVar = st.nextToken();
-               ArrayList<String> resultVars = 
parseStringArrayList(st.nextToken());
+               ArrayList<ResultVar> resultVars = 
parseResultVariables(st.nextToken());
                HashMap<String,String> params = 
parseStringHashMap(st.nextToken());
                
                //instructions 
@@ -1639,6 +1652,15 @@ public class ProgramConverter
                }
                return insts;
        }
+       
+       private static ArrayList<ResultVar> parseResultVariables(String in) {
+               ArrayList<ResultVar> ret = new ArrayList<>();
+               for(String var : parseStringArrayList(in)) {
+                       boolean accum = var.endsWith("+");
+                       ret.add(new ResultVar(accum ? var.substring(0, 
var.length()-1) : var, accum));
+               }
+               return ret;
+       }
 
        private static HashMap<String,String> parseStringHashMap( String in ) {
                HashMap<String,String> vars = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index 06be2b1..52ac922 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -154,7 +154,7 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
                ParForBody body = ProgramConverter.parseParForBody(_prog, 
(int)_workerID);
                _childBlocks = body.getChildBlocks();
                _ec          = body.getEc();
-               _resultVars  = body.getResultVarNames();
+               _resultVars  = body.getResultVariables();
                _numTasks    = 0;
                _numIters    = 0;
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
index 4c05791..6086d25 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
@@ -154,8 +154,8 @@ public class RemoteDPParWorkerReducer extends ParWorker
                        String in = MRJobConfiguration.getProgramBlocks(job);
                        ParForBody body = ProgramConverter.parseParForBody(in, 
(int)_workerID);
                        _childBlocks = body.getChildBlocks();
-                       _ec          = body.getEc();                            
-                       _resultVars  = body.getResultVarNames();
+                       _ec          = body.getEc();
+                       _resultVars  = body.getResultVariables();
        
                        //init local cache manager 
                        if( !CacheableData.isCachingActive() ) {
@@ -176,7 +176,7 @@ public class RemoteDPParWorkerReducer extends ParWorker
                                CacheableData.disableCaching();
 
                        _numTasks    = 0;
-                       _numIters    = 0;                       
+                       _numIters    = 0;
                }
                catch(Exception ex)
                {
@@ -192,8 +192,8 @@ public class RemoteDPParWorkerReducer extends ParWorker
        }
 
        @Override
-       public void close() 
-           throws IOException 
+       public void close()
+           throws IOException
        {
                try
                {

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index 52a19f0..41dc53c 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.stream.Collectors;
 
 import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
@@ -107,14 +108,15 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
                ParForBody body = ProgramConverter.parseParForBody(_prog, 
(int)_workerID);
                _childBlocks = body.getChildBlocks();
                _ec          = body.getEc();
-               _resultVars  = body.getResultVarNames();
+               _resultVars  = body.getResultVariables();
                _numTasks    = 0;
                _numIters    = 0;
                
                //reuse shared inputs (to read shared inputs once per process 
instead of once per core; 
                //we reuse everything except result variables and partitioned 
input matrices)
                _ec.pinVariables(_ec.getVarList()); //avoid cleanup of shared 
inputs
-               Collection<String> blacklist = UtilFunctions.asSet(_resultVars, 
_ec.getVarListPartitioned());
+               Collection<String> blacklist = 
UtilFunctions.asSet(_resultVars.stream()
+                       .map(v -> v._name).collect(Collectors.toList()), 
_ec.getVarListPartitioned());
                reuseVars.reuseVariables(_jobid, _ec.getVariables(), blacklist);
                
                //init and register-cleanup of buffer pool (in parfor spark, 
multiple tasks might 

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
index 30c57b8..7c24ae5 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
@@ -37,6 +37,7 @@ import scala.Tuple2;
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.parser.Expression.DataType;
+import org.apache.sysml.parser.ParForStatementBlock.ResultVar;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
@@ -89,11 +90,11 @@ public class RemoteParForUtils
                }
        }
 
-       public static void exportResultVariables( long workerID, 
LocalVariableMap vars, ArrayList<String> resultVars, OutputCollector<Writable, 
Writable> out ) 
+       public static void exportResultVariables( long workerID, 
LocalVariableMap vars, ArrayList<ResultVar> resultVars, 
OutputCollector<Writable, Writable> out ) 
                        throws DMLRuntimeException, IOException
        {
                exportResultVariables(workerID, vars, resultVars, null, out);
-       }       
+       }
        
        /**
         * For remote MR parfor workers.
@@ -106,7 +107,7 @@ public class RemoteParForUtils
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         * @throws IOException if IOException occurs
         */
-       public static void exportResultVariables( long workerID, 
LocalVariableMap vars, ArrayList<String> resultVars, 
+       public static void exportResultVariables( long workerID, 
LocalVariableMap vars, ArrayList<ResultVar> resultVars, 
                                                          
HashMap<String,String> rvarFnames, OutputCollector<Writable, Writable> out ) 
                throws DMLRuntimeException, IOException
        {
@@ -115,9 +116,9 @@ public class RemoteParForUtils
                Text ovalue = new Text();
                
                //foreach result variables probe if export necessary
-               for( String rvar : resultVars )
+               for( ResultVar rvar : resultVars )
                {
-                       Data dat = vars.get( rvar );
+                       Data dat = vars.get( rvar._name );
                        
                        //export output variable to HDFS (see RunMRJobs)
                        if ( dat != null && dat.getDataType() == 
DataType.MATRIX ) 
@@ -127,13 +128,13 @@ public class RemoteParForUtils
                                {
                                        if( 
ParForProgramBlock.ALLOW_REUSE_MR_PAR_WORKER && rvarFnames!=null )
                                        {
-                                               String fname = rvarFnames.get( 
rvar );
+                                               String fname = rvarFnames.get( 
rvar._name );
                                                if( fname!=null )
                                                        mo.setFileName( fname );
                                                        
                                                //export result var (iff 
actually modified in parfor)
                                                mo.exportData(); //note: this 
is equivalent to doing it in close (currently not required because 1 Task=1Map 
tasks, hence only one map invocation)              
-                                               rvarFnames.put(rvar, 
mo.getFileName()); 
+                                               rvarFnames.put(rvar._name, 
mo.getFileName());
                                        }
                                        else
                                        {
@@ -143,7 +144,7 @@ public class RemoteParForUtils
                                        
                                        //pass output vars (scalars by value, 
matrix by ref) to result
                                        //(only if actually exported, hence in 
check for dirty, otherwise potential problems in result merge)
-                                       String datStr = 
ProgramConverter.serializeDataObject(rvar, mo);
+                                       String datStr = 
ProgramConverter.serializeDataObject(rvar._name, mo);
                                        ovalue.set( datStr );
                                        out.collect( okey, ovalue );
                                }
@@ -161,31 +162,25 @@ public class RemoteParForUtils
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         * @throws IOException if IOException occurs
         */
-       public static ArrayList<String> exportResultVariables( long workerID, 
LocalVariableMap vars, ArrayList<String> resultVars) 
+       public static ArrayList<String> exportResultVariables( long workerID, 
LocalVariableMap vars, ArrayList<ResultVar> resultVars) 
                throws DMLRuntimeException, IOException
        {
                ArrayList<String> ret = new ArrayList<>();
                
                //foreach result variables probe if export necessary
-               for( String rvar : resultVars )
-               {
-                       Data dat = vars.get( rvar );
-                       
+               for( ResultVar rvar : resultVars ) {
+                       Data dat = vars.get( rvar._name );
                        //export output variable to HDFS (see RunMRJobs)
-                       if ( dat != null && dat.getDataType() == 
DataType.MATRIX ) 
-                       {
+                       if ( dat != null && dat.getDataType() == 
DataType.MATRIX )  {
                                MatrixObject mo = (MatrixObject) dat;
-                               if( mo.isDirty() )
-                               {
+                               if( mo.isDirty() ) {
                                        //export result var (iff actually 
modified in parfor)
                                        mo.exportData(); 
-                                       
-                                       
                                        //pass output vars (scalars by value, 
matrix by ref) to result
                                        //(only if actually exported, hence in 
check for dirty, otherwise potential problems in result merge)
-                                       ret.add( 
ProgramConverter.serializeDataObject(rvar, mo) );
+                                       ret.add( 
ProgramConverter.serializeDataObject(rvar._name, mo) );
                                }
-                       }       
+                       }
                }
                
                return ret;

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
index 730adbd..7029061 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
@@ -164,15 +164,15 @@ public class RemoteParWorkerMapper extends ParWorker  
//MapReduceBase not requir
                                String in = 
MRJobConfiguration.getProgramBlocks(job);
                                ParForBody body = 
ProgramConverter.parseParForBody(in, (int)_workerID);
                                _childBlocks = body.getChildBlocks();
-                               _ec          = body.getEc();                    
        
-                               _resultVars  = body.getResultVarNames();
+                               _ec          = body.getEc();
+                               _resultVars  = body.getResultVariables();
                
                                //init local cache manager 
                                if( !CacheableData.isCachingActive() ) {
                                        String uuid = 
IDHandler.createDistributedUniqueID();
                                        
LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
                                        CacheableData.initCaching( uuid ); 
//incl activation, cache dir creation (each map task gets its own dir for 
simplified cleanup)
-                               }                               
+                               }
                                if( 
!CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for 
local mode
                                        
CacheableData.cacheEvictionLocalFilePrefix = 
CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
                                }

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java
index ca7aad6..5e814fa 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java
@@ -26,8 +26,10 @@ import org.apache.commons.logging.LogFactory;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.matrix.data.DenseBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
 
 /**
  * Due to independence of all iterations, any result has the following 
properties:
@@ -38,22 +40,24 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 public abstract class ResultMerge 
 {
        protected static final Log LOG = 
LogFactory.getLog(ResultMerge.class.getName());
-       
        protected static final String NAME_SUFFIX = "_rm";
+       protected static final BinaryOperator PLUS = 
InstructionUtils.parseBinaryOperator("+");
        
        //inputs to result merge
        protected MatrixObject   _output      = null;
        protected MatrixObject[] _inputs      = null; 
        protected String         _outputFName = null;
+       protected boolean        _isAccum     = false;
        
        protected ResultMerge( ) {
                //do nothing
        }
        
-       public ResultMerge( MatrixObject out, MatrixObject[] in, String 
outputFilename ) {
+       public ResultMerge( MatrixObject out, MatrixObject[] in, String 
outputFilename, boolean accum ) {
                _output = out;
                _inputs = in;
                _outputFName = outputFilename;
+               _isAccum = accum;
        }
        
        /**
@@ -89,7 +93,10 @@ public abstract class ResultMerge
         */
        protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, 
boolean appendOnly ) throws DMLRuntimeException {
                //pass through to matrix block operations
-               out.merge(in, appendOnly);
+               if( _isAccum )
+                       out.binaryOperationsInPlace(PLUS, in);
+               else
+                       out.merge(in, appendOnly);
        }
 
        /**
@@ -109,44 +116,32 @@ public abstract class ResultMerge
                //   (using sparse iterator would miss values set to 0) 
                // * Explicit NaN awareness because for cases were original 
matrix contains
                //   NaNs, since NaN != NaN, otherwise we would potentially 
overwrite results
+               // * For the case of accumulation, we add out += (new-old) to 
ensure correct results
+               //   because all inputs have the old values replicated
                
                if( in.isEmptyBlock(false) ) {
+                       if( _isAccum ) return; //nothing to do
                        for( int i=0; i<in.getNumRows(); i++ )
                                for( int j=0; j<in.getNumColumns(); j++ )
                                        if( compare.get(i, j) != 0 )
                                                out.quickSetValue(i, j, 0);
                }
-               else if( in.isInSparseFormat() ) { //SPARSE
+               else { //SPARSE/DENSE
                        int rows = in.getNumRows();
                        int cols = in.getNumColumns();
                        for( int i=0; i<rows; i++ )
                                for( int j=0; j<cols; j++ ) {
-                                       double value = 
in.getValueSparseUnsafe(i,j);  //input value
-                                       if(   (value != compare.get(i,j) && 
!Double.isNaN(value) )     //for new values only (div)
-                                               || Double.isNaN(value) != 
Double.isNaN(compare.get(i,j)) ) //NaN awareness 
+                                       double valOld = compare.get(i,j);
+                                       double valNew = in.quickGetValue(i,j); 
//input value
+                                       if( (valNew != valOld && 
!Double.isNaN(valNew) )      //for changed values 
+                                               || Double.isNaN(valNew) != 
Double.isNaN(valOld) ) //NaN awareness 
                                        {
+                                               double value = !_isAccum ? 
valNew :
+                                                       (out.quickGetValue(i, 
j) + (valNew - valOld));
                                                out.quickSetValue(i, j, value);
                                        }
                                }
                }
-               else { //DENSE
-                       //guaranteed allocated due to empty case above
-                       DenseBlock a = in.getDenseBlock();
-                       int rows = in.getNumRows();
-                       int cols = in.getNumColumns();
-                       for( int i=0; i<rows; i++ ) {
-                               double[] avals = a.values(i);
-                               int aix = a.pos(i);
-                               for( int j=0; j<cols; j++ ) {
-                                       double value = avals[aix+j]; //input 
value
-                                       if( (value != compare.get(i,j) && 
!Double.isNaN(value) ) //for new values only (div)
-                                               || Double.isNaN(value) != 
Double.isNaN(compare.get(i,j)) ) //NaN awareness
-                                       {
-                                               out.quickSetValue( i, j, value 
);
-                                       }
-                               }
-                       }
-               }
        }
 
        protected long computeNonZeros( MatrixObject out, List<MatrixObject> in 
) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
index fec05c7..e9d178a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
@@ -31,8 +31,8 @@ public class ResultMergeLocalAutomatic extends ResultMerge
 {
        private ResultMerge _rm = null;
        
-       public ResultMergeLocalAutomatic( MatrixObject out, MatrixObject[] in, 
String outputFilename ) {
-               super( out, in, outputFilename );
+       public ResultMergeLocalAutomatic( MatrixObject out, MatrixObject[] in, 
String outputFilename, boolean accum ) {
+               super( out, in, outputFilename, accum );
        }
 
        @Override
@@ -46,9 +46,9 @@ public class ResultMergeLocalAutomatic extends ResultMerge
                long cols = mc.getCols();
                
                if( OptimizerRuleBased.isInMemoryResultMerge(rows, cols, 
OptimizerUtils.getLocalMemBudget()) )
-                       _rm = new ResultMergeLocalMemory( _output, _inputs, 
_outputFName );
+                       _rm = new ResultMergeLocalMemory( _output, _inputs, 
_outputFName, _isAccum );
                else
-                       _rm = new ResultMergeLocalFile( _output, _inputs, 
_outputFName );
+                       _rm = new ResultMergeLocalFile( _output, _inputs, 
_outputFName, _isAccum );
                
                MatrixObject ret = _rm.executeSerialMerge();
 
@@ -66,9 +66,9 @@ public class ResultMergeLocalAutomatic extends ResultMerge
                long cols = mc.getCols();
                
                if( OptimizerRuleBased.isInMemoryResultMerge(par * rows, cols, 
OptimizerUtils.getLocalMemBudget()) )
-                       _rm = new ResultMergeLocalMemory( _output, _inputs, 
_outputFName );
+                       _rm = new ResultMergeLocalMemory( _output, _inputs, 
_outputFName, _isAccum );
                else
-                       _rm = new ResultMergeLocalFile( _output, _inputs, 
_outputFName );
+                       _rm = new ResultMergeLocalFile( _output, _inputs, 
_outputFName, _isAccum );
                
                return _rm.executeParallelMerge(par);
        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
index af77783..049dbf1 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
@@ -77,9 +77,9 @@ public class ResultMergeLocalFile extends ResultMerge
        //internal comparison matrix
        private IDSequence _seq = null;
        
-       public ResultMergeLocalFile( MatrixObject out, MatrixObject[] in, 
String outputFilename )
+       public ResultMergeLocalFile( MatrixObject out, MatrixObject[] in, 
String outputFilename, boolean accum )
        {
-               super( out, in, outputFilename );
+               super( out, in, outputFilename, accum );
                
                _seq = new IDSequence();
        }
@@ -121,7 +121,7 @@ public class ResultMergeLocalFile extends ResultMerge
                                merge( _outputFName, _output, inMO );
                                
                                //create new output matrix (e.g., to prevent 
potential export<->read file access conflict
-                               moNew = createNewMatrixObject( _output, inMO ); 
+                               moNew = createNewMatrixObject( _output, inMO );
                        }
                        else
                        {
@@ -146,7 +146,7 @@ public class ResultMergeLocalFile extends ResultMerge
                return executeSerialMerge();
        }
 
-       private MatrixObject createNewMatrixObject(MatrixObject output, 
ArrayList<MatrixObject> inMO ) 
+       private MatrixObject createNewMatrixObject(MatrixObject output, 
ArrayList<MatrixObject> inMO)
                throws DMLRuntimeException
        {
                MetaDataFormat metadata = (MetaDataFormat) 
_output.getMetaData();
@@ -156,9 +156,8 @@ public class ResultMergeLocalFile extends ResultMerge
                MatrixCharacteristics mcOld = 
metadata.getMatrixCharacteristics();
                OutputInfo oiOld = metadata.getOutputInfo();
                InputInfo iiOld = metadata.getInputInfo();
-               MatrixCharacteristics mc = new 
MatrixCharacteristics(mcOld.getRows(),mcOld.getCols(),
-                                                                            
mcOld.getRowsPerBlock(),mcOld.getColsPerBlock());
-               mc.setNonZeros( computeNonZeros(output, inMO) );
+               MatrixCharacteristics mc = new MatrixCharacteristics(mcOld);
+               mc.setNonZeros(_isAccum ? -1 : computeNonZeros(output, inMO));
                MetaDataFormat meta = new MetaDataFormat(mc,oiOld,iiOld);
                moNew.setMetaData( meta );
                

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
index 991baca..a59e81c 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
@@ -45,8 +45,8 @@ public class ResultMergeLocalMemory extends ResultMerge
        //internal comparison matrix
        private DenseBlock _compare = null;
        
-       public ResultMergeLocalMemory( MatrixObject out, MatrixObject[] in, 
String outputFilename ) {
-               super( out, in, outputFilename );
+       public ResultMergeLocalMemory( MatrixObject out, MatrixObject[] in, 
String outputFilename, boolean accum ) {
+               super( out, in, outputFilename, accum );
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
index 7ea1543..4b1bb0e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
@@ -70,9 +70,10 @@ public class ResultMergeRemoteMR extends ResultMerge
        //private int  _max_retry = -1;
        private boolean _jvmReuse = false;
 
-       public ResultMergeRemoteMR(MatrixObject out, MatrixObject[] in, String 
outputFilename, long pfid, int numMappers, int numReducers, int replication, 
int max_retry, boolean jvmReuse) 
+       public ResultMergeRemoteMR(MatrixObject out, MatrixObject[] in, String 
outputFilename, boolean accum,
+               long pfid, int numMappers, int numReducers, int replication, 
int max_retry, boolean jvmReuse) 
        {
-               super(out, in, outputFilename);
+               super(out, in, outputFilename, accum);
                
                _pfid = pfid;
                _numMappers = numMappers;
@@ -136,9 +137,8 @@ public class ResultMergeRemoteMR extends ResultMerge
                                moNew = new 
MatrixObject(_output.getValueType(), _outputFName);
                                OutputInfo oiOld = metadata.getOutputInfo();
                                InputInfo iiOld = metadata.getInputInfo();
-                               MatrixCharacteristics mc = new 
MatrixCharacteristics(mcOld.getRows(),mcOld.getCols(),
-                                                                               
             mcOld.getRowsPerBlock(),mcOld.getColsPerBlock());
-                               mc.setNonZeros( computeNonZeros(_output, inMO) 
);
+                               MatrixCharacteristics mc = new 
MatrixCharacteristics(mcOld);
+                               mc.setNonZeros(_isAccum ? -1 : 
computeNonZeros(_output, inMO));
                                MetaDataFormat meta = new 
MetaDataFormat(mc,oiOld,iiOld);
                                moNew.setMetaData( meta );
                        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
index 2b64bb2..804e490 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
@@ -54,9 +54,10 @@ public class ResultMergeRemoteSpark extends ResultMerge
        private int  _numMappers = -1;
        private int  _numReducers = -1;
        
-       public ResultMergeRemoteSpark(MatrixObject out, MatrixObject[] in, 
String outputFilename, ExecutionContext ec, int numMappers, int numReducers) 
+       public ResultMergeRemoteSpark(MatrixObject out, MatrixObject[] in, 
String outputFilename, boolean accum,
+               ExecutionContext ec, int numMappers, int numReducers) 
        {
-               super(out, in, outputFilename);
+               super(out, in, outputFilename, accum);
                
                _ec = ec;
                _numMappers = numMappers;
@@ -83,8 +84,7 @@ public class ResultMergeRemoteSpark extends ResultMerge
 
                try
                {
-                       if( _inputs != null && _inputs.length>0 )
-                       {
+                       if( _inputs != null && _inputs.length>0 ) {
                                //prepare compare
                                MetaDataFormat metadata = (MetaDataFormat) 
_output.getMetaData();
                                MatrixCharacteristics mcOld = 
metadata.getMatrixCharacteristics();
@@ -97,24 +97,21 @@ public class ResultMergeRemoteSpark extends ResultMerge
                                moNew = new 
MatrixObject(_output.getValueType(), _outputFName);
                                OutputInfo oiOld = metadata.getOutputInfo();
                                InputInfo iiOld = metadata.getInputInfo();
-                               MatrixCharacteristics mc = new 
MatrixCharacteristics(mcOld.getRows(),mcOld.getCols(),
-                                                                               
             mcOld.getRowsPerBlock(),mcOld.getColsPerBlock());
-                               mc.setNonZeros( computeNonZeros(_output, 
Arrays.asList(_inputs)) );
+                               MatrixCharacteristics mc = new 
MatrixCharacteristics(mcOld);
+                               mc.setNonZeros(_isAccum ? -1 : 
computeNonZeros(_output, Arrays.asList(_inputs)));
                                MetaDataFormat meta = new 
MetaDataFormat(mc,oiOld,iiOld);
                                moNew.setMetaData( meta );
                                moNew.setRDDHandle( ro );
                        }
-                       else
-                       {
+                       else {
                                moNew = _output; //return old matrix, to 
prevent copy
                        }
                }
-               catch(Exception ex)
-               {
+               catch(Exception ex) {
                        throw new DMLRuntimeException(ex);
                }
                
-               return moNew;           
+               return moNew;
        }
 
        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
index f517bda..2a1fd19 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
@@ -26,6 +26,7 @@ import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.lops.LopProperties;
 import org.apache.sysml.parser.ParForStatementBlock;
+import org.apache.sysml.parser.ParForStatementBlock.ResultVar;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
@@ -167,7 +168,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
                        super.rewriteSetTranposeSparseVectorOperations(pn, 
partitionedMatrices, ec.getVariables());
 
                        //rewrite 14:
-                       HashSet<String> inplaceResultVars = new HashSet<>();
+                       HashSet<ResultVar> inplaceResultVars = new HashSet<>();
                        super.rewriteSetInPlaceResultIndexing(pn, M1, 
ec.getVariables(), inplaceResultVars, ec);
 
                        //rewrite 15:
@@ -183,7 +184,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
                        rewriteSetTaskPartitioner( pn, false, false ); 
//flagLIX always false 
 
                        // rewrite 14: set in-place result indexing
-                       HashSet<String> inplaceResultVars = new HashSet<>();
+                       HashSet<ResultVar> inplaceResultVars = new HashSet<>();
                        super.rewriteSetInPlaceResultIndexing(pn, M1, 
ec.getVariables(), inplaceResultVars, ec);
 
                        if( !OptimizerUtils.isSparkExecutionMode() ) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index c9cb833..9fc0dbe 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -21,6 +21,7 @@ package org.apache.sysml.runtime.controlprogram.parfor.opt;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -62,6 +63,7 @@ import org.apache.sysml.parser.FunctionStatementBlock;
 import org.apache.sysml.parser.LanguageException;
 import org.apache.sysml.parser.ParForStatement;
 import org.apache.sysml.parser.ParForStatementBlock;
+import org.apache.sysml.parser.ParForStatementBlock.ResultVar;
 import org.apache.sysml.parser.StatementBlock;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.ForProgramBlock;
@@ -288,7 +290,7 @@ public class OptimizerRuleBased extends Optimizer
                        rewriteSetTranposeSparseVectorOperations(pn, 
partitionedMatrices, ec.getVariables());
                        
                        // rewrite 14: set in-place result indexing
-                       HashSet<String> inplaceResultVars = new HashSet<>();
+                       HashSet<ResultVar> inplaceResultVars = new HashSet<>();
                        rewriteSetInPlaceResultIndexing(pn, M1, 
ec.getVariables(), inplaceResultVars, ec);
                        
                        // rewrite 15: disable caching
@@ -303,7 +305,7 @@ public class OptimizerRuleBased extends Optimizer
                        rewriteSetTaskPartitioner( pn, false, false ); 
//flagLIX always false 
                        
                        // rewrite 14: set in-place result indexing
-                       HashSet<String> inplaceResultVars = new HashSet<>();
+                       HashSet<ResultVar> inplaceResultVars = new HashSet<>();
                        rewriteSetInPlaceResultIndexing(pn, M1, 
ec.getVariables(), inplaceResultVars, ec);
                        
                        if( !OptimizerUtils.isSparkExecutionMode() ) {
@@ -623,7 +625,7 @@ public class OptimizerRuleBased extends Optimizer
                return apply;
        }
 
-       protected boolean isResultPartitionableAll( Collection<OptNode> nlist, 
ArrayList<String> resultVars, LocalVariableMap vars, String iterVarname ) 
+       protected boolean isResultPartitionableAll( Collection<OptNode> nlist, 
ArrayList<ResultVar> resultVars, LocalVariableMap vars, String iterVarname ) 
                throws DMLRuntimeException
        {
                boolean ret = true;
@@ -637,7 +639,7 @@ public class OptimizerRuleBased extends Optimizer
                return ret;
        }
 
-       protected boolean isResultPartitionable( OptNode n, ArrayList<String> 
resultVars, LocalVariableMap vars, String iterVarname ) 
+       protected boolean isResultPartitionable( OptNode n, 
ArrayList<ResultVar> resultVars, LocalVariableMap vars, String iterVarname ) 
                throws DMLRuntimeException
        {
                boolean ret = true;
@@ -1663,7 +1665,7 @@ public class OptimizerRuleBased extends Optimizer
        //REWRITE set in-place result indexing
        ///
 
-       protected void rewriteSetInPlaceResultIndexing(OptNode pn, double M, 
LocalVariableMap vars, HashSet<String> inPlaceResultVars, ExecutionContext ec) 
+       protected void rewriteSetInPlaceResultIndexing(OptNode pn, double M, 
LocalVariableMap vars, HashSet<ResultVar> inPlaceResultVars, ExecutionContext 
ec) 
                throws DMLRuntimeException 
        {
                //assertions (warnings of corrupt optimizer decisions)
@@ -1677,14 +1679,14 @@ public class OptimizerRuleBased extends Optimizer
                
                //note currently we decide for all result vars jointly, i.e.,
                //only if all fit pinned in remaining budget, we apply this 
rewrite.
-               ArrayList<String> retVars = pfpb.getResultVariables();
+               ArrayList<ResultVar> retVars = pfpb.getResultVariables();
                
                //compute total sum of pinned result variable memory
                double sum = computeTotalSizeResultVariables(retVars, vars, 
pfpb.getDegreeOfParallelism());
                
                //NOTE: currently this rule is too conservative (the result 
variable is assumed to be dense and
                //most importantly counted twice if this is part of the maximum 
operation)
-               double totalMem = Math.max((M+sum), 
rComputeSumMemoryIntermediates(pn, new HashSet<String>()));
+               double totalMem = Math.max((M+sum), 
rComputeSumMemoryIntermediates(pn, new HashSet<ResultVar>()));
                
                //optimization decision
                if( rHasOnlyInPlaceSafeLeftIndexing(pn, retVars) ) //basic 
correctness constraint
@@ -1710,8 +1712,8 @@ public class OptimizerRuleBased extends Optimizer
                {
                        //add result vars to result and set state
                        //will be serialized and transfered via symbol table 
-                       for( String var : retVars ){
-                               Data dat = vars.get(var);
+                       for( ResultVar var : retVars ){
+                               Data dat = vars.get(var._name);
                                if( dat instanceof MatrixObject )
                                        
((MatrixObject)dat).setUpdateType(UpdateType.INPLACE_PINNED);
                        }
@@ -1719,10 +1721,10 @@ public class OptimizerRuleBased extends Optimizer
                }
                
                LOG.debug(getOptMode()+" OPT: rewrite 'set in-place result 
indexing' - result="+
-                       apply+" 
("+ProgramConverter.serializeStringCollection(inPlaceResultVars)+", 
M="+toMB(totalMem)+")" );
+                       apply+" 
("+Arrays.toString(inPlaceResultVars.toArray(new ResultVar[0]))+", 
M="+toMB(totalMem)+")" );
        }
        
-       protected boolean rHasOnlyInPlaceSafeLeftIndexing( OptNode n, 
ArrayList<String> retVars ) 
+       protected boolean rHasOnlyInPlaceSafeLeftIndexing( OptNode n, 
ArrayList<ResultVar> retVars ) 
                throws DMLRuntimeException
        {
                boolean ret = true;
@@ -1739,10 +1741,10 @@ public class OptimizerRuleBased extends Optimizer
                return ret;
        }
 
-       private static double computeTotalSizeResultVariables(ArrayList<String> 
retVars, LocalVariableMap vars, int k) {
+       private static double 
computeTotalSizeResultVariables(ArrayList<ResultVar> retVars, LocalVariableMap 
vars, int k) {
                double sum = 1;
-               for( String var : retVars ) {
-                       Data dat = vars.get(var);
+               for( ResultVar var : retVars ) {
+                       Data dat = vars.get(var._name);
                        if( !(dat instanceof MatrixObject) )
                                continue;
                        MatrixObject mo = (MatrixObject)dat;
@@ -1762,7 +1764,7 @@ public class OptimizerRuleBased extends Optimizer
        //REWRITE disable CP caching  
        ///
 
-       protected void rewriteDisableCPCaching(OptNode pn, HashSet<String> 
inplaceResultVars, LocalVariableMap vars) 
+       protected void rewriteDisableCPCaching(OptNode pn, HashSet<ResultVar> 
inplaceResultVars, LocalVariableMap vars) 
                throws DMLRuntimeException 
        {
                //assertions (warnings of corrupt optimizer decisions)
@@ -1784,7 +1786,7 @@ public class OptimizerRuleBased extends Optimizer
                LOG.debug(getOptMode()+" OPT: rewrite 'disable CP caching' - 
result="+apply+" (M="+toMB(M_sumInterm)+")" );
        }
 
-       protected double rComputeSumMemoryIntermediates( OptNode n, 
HashSet<String> inplaceResultVars )
+       protected double rComputeSumMemoryIntermediates( OptNode n, 
HashSet<ResultVar> inplaceResultVars )
                throws DMLRuntimeException
        {
                double sum = 0;
@@ -2061,24 +2063,24 @@ public class OptimizerRuleBased extends Optimizer
                ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
                            
.getAbstractPlanMapping().getMappedProg(n.getID())[1];
 
-               ArrayList<String> cleanedVars = new ArrayList<>();
-               ArrayList<String> resultVars = pfpb.getResultVariables();
+               ArrayList<ResultVar> cleanedVars = new ArrayList<>();
+               ArrayList<ResultVar> resultVars = pfpb.getResultVariables();
                String itervar = pfpb.getIterVar();
                
-               for( String rvar : resultVars ) {
-                       Data dat = ec.getVariable(rvar);
+               for( ResultVar rvar : resultVars ) {
+                       Data dat = ec.getVariable(rvar._name);
                        if( dat instanceof MatrixObject && 
((MatrixObject)dat).getNnz()!=0     //subject to result merge with compare
                                && n.hasOnlySimpleChilds()                      
                   //guaranteed no conditional indexing 
-                               && rContainsResultFullReplace(n, rvar, itervar, 
(MatrixObject)dat) //guaranteed full matrix replace 
+                               && rContainsResultFullReplace(n, rvar._name, 
itervar, (MatrixObject)dat) //guaranteed full matrix replace 
                                //&& 
!pfsb.variablesRead().containsVariable(rvar)                  //never read 
variable in loop body
-                               && !rIsReadInRightIndexing(n, rvar)             
                   //never read variable in loop body
+                               && !rIsReadInRightIndexing(n, rvar._name)       
                   //never read variable in loop body
                                && 
((MatrixObject)dat).getNumRows()<=Integer.MAX_VALUE
                                && 
((MatrixObject)dat).getNumColumns()<=Integer.MAX_VALUE )
                        {
                                //replace existing matrix object with empty 
matrix
                                MatrixObject mo = (MatrixObject)dat;
                                ec.cleanupCacheableData(mo);
-                               ec.setMatrixOutput(rvar, new 
MatrixBlock((int)mo.getNumRows(), (int)mo.getNumColumns(),false), null);
+                               ec.setMatrixOutput(rvar._name, new 
MatrixBlock((int)mo.getNumRows(), (int)mo.getNumColumns(),false), null);
                                
                                //keep track of cleaned result variables
                                cleanedVars.add(rvar);
@@ -2086,7 +2088,8 @@ public class OptimizerRuleBased extends Optimizer
                }
 
                _numEvaluatedPlans++;
-               LOG.debug(getOptMode()+" OPT: rewrite 'remove unnecessary 
compare matrix' - result="+(!cleanedVars.isEmpty())+" 
("+ProgramConverter.serializeStringCollection(cleanedVars)+")" );
+               LOG.debug(getOptMode()+" OPT: rewrite 'remove unnecessary 
compare matrix' - result="+(!cleanedVars.isEmpty())
+                       +" 
("+ProgramConverter.serializeResultVariables(cleanedVars)+")" );
        }
 
        protected boolean rContainsResultFullReplace( OptNode n, String 
resultVar, String iterVarname, MatrixObject mo ) 
@@ -2231,13 +2234,13 @@ public class OptimizerRuleBased extends Optimizer
                LOG.debug(getOptMode()+" OPT: rewrite 'set result merge' - 
result="+ret );
        }
 
-       protected boolean determineFlagCellFormatWoCompare( ArrayList<String> 
resultVars, LocalVariableMap vars  )
+       protected boolean determineFlagCellFormatWoCompare( 
ArrayList<ResultVar> resultVars, LocalVariableMap vars  )
        {
                boolean ret = true;
                
-               for( String rVar : resultVars )
+               for( ResultVar rVar : resultVars )
                {
-                       Data dat = vars.get(rVar);
+                       Data dat = vars.get(rVar._name);
                        if( dat == null || !(dat instanceof MatrixObject) )
                        {
                                ret = false; 
@@ -2250,8 +2253,7 @@ public class OptimizerRuleBased extends Optimizer
                                OutputInfo oi = meta.getOutputInfo();
                                long nnz = 
meta.getMatrixCharacteristics().getNonZeros();
                                
-                               if( oi == OutputInfo.BinaryBlockOutputInfo || 
nnz != 0 )
-                               {
+                               if( oi == OutputInfo.BinaryBlockOutputInfo || 
nnz != 0 ) {
                                        ret = false; 
                                        break;
                                }
@@ -2261,7 +2263,7 @@ public class OptimizerRuleBased extends Optimizer
                return ret;
        }
 
-       protected boolean hasResultMRLeftIndexing( OptNode n, ArrayList<String> 
resultVars, LocalVariableMap vars, boolean checkSize ) 
+       protected boolean hasResultMRLeftIndexing( OptNode n, 
ArrayList<ResultVar> resultVars, LocalVariableMap vars, boolean checkSize ) 
                throws DMLRuntimeException
        {
                boolean ret = false;
@@ -2309,7 +2311,7 @@ public class OptimizerRuleBased extends Optimizer
         * @return true if result sizes larger than local memory budget
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       protected boolean hasLargeTotalResults( OptNode pn, ArrayList<String> 
resultVars, LocalVariableMap vars, boolean checkSize ) 
+       protected boolean hasLargeTotalResults( OptNode pn, 
ArrayList<ResultVar> resultVars, LocalVariableMap vars, boolean checkSize ) 
                throws DMLRuntimeException
        {
                double totalSize = 0;
@@ -2319,17 +2321,17 @@ public class OptimizerRuleBased extends Optimizer
                int k = pn.getK();
                long W = estimateNumTasks(tp, _N, k); 
                
-               for( String var : resultVars )
+               for( ResultVar var : resultVars )
                {
                        //Potential unknowns: for local result var of child 
parfor (but we're only interested in top level)
                        //Potential scalars: for disabled dependency analysis 
and unbounded scoping                     
-                       Data dat = vars.get( var );
+                       Data dat = vars.get( var._name );
                        if( dat != null && dat instanceof MatrixObject ) 
                        {
-                               MatrixObject mo = (MatrixObject) vars.get( var 
);
+                               MatrixObject mo = (MatrixObject) dat;
                                
                                long rows = mo.getNumRows();
-                               long cols = mo.getNumColumns(); 
+                               long cols = mo.getNumColumns();
                                long nnz = mo.getNnz();
                                
                                if( nnz > 0 ) //w/ compare
@@ -2364,7 +2366,7 @@ public class OptimizerRuleBased extends Optimizer
                return W;
        }
 
-       protected boolean hasOnlyInMemoryResults( OptNode n, ArrayList<String> 
resultVars, LocalVariableMap vars, boolean inLocal ) 
+       protected boolean hasOnlyInMemoryResults( OptNode n, 
ArrayList<ResultVar> resultVars, LocalVariableMap vars, boolean inLocal ) 
                throws DMLRuntimeException
        {
                boolean ret = true;

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java 
b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
index 85a45ff..ff85aa7 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
@@ -528,7 +528,6 @@ public class InstructionUtils
        }
        
        public static BinaryOperator parseBinaryOperator(String opcode) 
-               throws DMLRuntimeException
        {
                if(opcode.equalsIgnoreCase("=="))
                        return new BinaryOperator(Equals.getEqualsFnObject());
@@ -583,7 +582,7 @@ public class InstructionUtils
                else if ( opcode.equalsIgnoreCase("min") ) 
                        return new 
BinaryOperator(Builtin.getBuiltinFnObject("min"));
                
-               throw new DMLRuntimeException("Unknown binary opcode " + 
opcode);
+               throw new RuntimeException("Unknown binary opcode " + opcode);
        }
        
        public static TernaryOperator parseTernaryOperator(String opcode) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 432adec..e06c8c1 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -630,12 +630,6 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                        return 0;
                return denseBlock.get(r, c);
        }
-
-       public double getValueSparseUnsafe(int r, int c) {
-               if(sparseBlock==null || sparseBlock.isEmpty(r))
-                       return 0;
-               return sparseBlock.get(r, c);
-       }
        
        /**
         * Append value is only used when values are appended at the end of 
each row for the sparse representation

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAccumulatorResultMergeTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAccumulatorResultMergeTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAccumulatorResultMergeTest.java
new file mode 100644
index 0000000..7936c26
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAccumulatorResultMergeTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.parfor;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class ParForAccumulatorResultMergeTest extends AutomatedTestBase 
+{
+       private final static String TEST_DIR = "functions/parfor/";
+       private final static String TEST_NAME1 = "parfor_accumulator1"; //local 
merge
+       private final static String TEST_NAME2 = "parfor_accumulator2"; 
//remote MR merge
+       private final static String TEST_NAME3 = "parfor_accumulator3"; 
//remote SPARK merge
+       
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
ParForAccumulatorResultMergeTest.class.getSimpleName() + "/";
+       
+       private final static double eps = 0;
+       private final static int rows = 1210;
+       private final static int cols = 345;
+       
+       
+       @Override
+       public void setUp() {
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "R" }) );
+               addTestConfiguration(TEST_NAME2, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] { "R" }) );
+               addTestConfiguration(TEST_NAME3, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] { "R" }) );
+       }
+
+       @Test
+       public void testParForAccumulatorLocalEmptyDense() {
+               runParForAccumulatorResultMergeTest(TEST_NAME1, false, false, 
ExecType.CP);
+       }
+       
+       @Test
+       public void testParForAccumulatorLocalEmptySparse() {
+               runParForAccumulatorResultMergeTest(TEST_NAME1, false, true, 
ExecType.CP);
+       }
+       
+       @Test
+       public void testParForAccumulatorLocalInitDense() {
+               runParForAccumulatorResultMergeTest(TEST_NAME1, true, false, 
ExecType.CP);
+       }
+       
+       @Test
+       public void testParForAccumulatorLocalInitSparse() {
+               runParForAccumulatorResultMergeTest(TEST_NAME1, true, true, 
ExecType.CP);
+       }
+
+       private void runParForAccumulatorResultMergeTest( String test, boolean 
init, boolean sparse, ExecType et )
+       {
+               RUNTIME_PLATFORM platformOld = rtplatform;
+               switch( et ) {
+                       case CP: rtplatform = RUNTIME_PLATFORM.SINGLE_NODE; 
break;
+                       case MR: rtplatform = RUNTIME_PLATFORM.HYBRID; break;
+                       case SPARK: rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK; 
break;
+                       default: throw new RuntimeException("Unsupported exec 
type: "+et.name());
+               }
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               if( et == ExecType.SPARK )
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               
+               try {
+                       String TEST_NAME = test;
+                       TestConfiguration config = 
getTestConfiguration(TEST_NAME);
+                       config.addVariable("rows", rows);
+                       config.addVariable("cols", cols);
+                       loadTestConfiguration(config);
+                       
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + TEST_NAME + ".dml";
+                       programArgs = new String[]{"-args", 
+                               String.valueOf(rows), String.valueOf(cols), 
String.valueOf(init).toUpperCase(),
+                               String.valueOf(sparse).toUpperCase(), 
output("R") };
+                       
+                       fullRScriptName = HOME + TEST_NAME + ".R";
+                       rCmd = "Rscript" + " " + fullRScriptName + " " + 
+                               String.valueOf(rows) + " " + 
String.valueOf(cols) + " " + String.valueOf(init).toUpperCase() 
+                                       + " " + 
String.valueOf(sparse).toUpperCase() + " " + expectedDir();
+       
+                       //run tests
+                       runTest(true, false, null, -1);
+                       runRScript(true);
+               
+                       //compare matrices
+                       HashMap<CellIndex, Double> dmlfile = 
readDMLMatrixFromHDFS("R");
+                       HashMap<CellIndex, Double> rfile  = 
readRMatrixFromFS("R");
+                       TestUtils.compareMatrices(dmlfile, rfile, eps, "DML", 
"R");
+               }
+               finally {
+                       rtplatform = platformOld;
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/test/scripts/functions/parfor/parfor_accumulator1.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/parfor/parfor_accumulator1.R 
b/src/test/scripts/functions/parfor/parfor_accumulator1.R
new file mode 100644
index 0000000..a1c587c
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_accumulator1.R
@@ -0,0 +1,39 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+args <- commandArgs(TRUE)
+options(digits=22)
+
+library("Matrix")
+
+rlen = as.integer(args[1]);
+clen = as.integer(args[2]);
+
+R = matrix(ifelse(as.logical(args[3]), 7, 0), rlen, clen);
+if( as.logical(args[4]) ) {
+   R[,50:300] = matrix(0, rlen, 251);
+}
+for(i in 1:10) {
+   R = R + matrix(i, rlen, clen);
+}
+
+writeMM(as(R, "CsparseMatrix"), paste(args[5], "R", sep=""));

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/test/scripts/functions/parfor/parfor_accumulator1.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/parfor/parfor_accumulator1.dml 
b/src/test/scripts/functions/parfor/parfor_accumulator1.dml
new file mode 100644
index 0000000..26a090d
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_accumulator1.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+rlen = $1;
+clen = $2;
+init = $3
+sparse = $4;
+
+R = matrix(ifelse(init, 7, 0), rlen, clen);
+if( sparse )
+   R[,50:300] = matrix(0, rlen, 251);
+
+parfor(i in 1:10, opt=CONSTRAINED, resultmerge=LOCAL_AUTOMATIC)
+   R += matrix(i, rlen, clen);
+
+write(R, $5);

http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
----------------------------------------------------------------------
diff --git 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
index c898a7a..8844be2 100644
--- 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
+++ 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
@@ -27,6 +27,7 @@ import org.junit.runners.Suite;
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
        ForLoopPredicateTest.class,
+       ParForAccumulatorResultMergeTest.class,
        ParForAdversarialLiteralsTest.class,
        ParForBlockwiseDataPartitioningTest.class,
        ParForColwiseDataPartitioningTest.class,

Reply via email to