Repository: systemml Updated Branches: refs/heads/master c2dd05e51 -> 00d72a092
[SYSTEMML-2095] Extended local parfor result merge w/ accumulators This patch realizes the runtime integration of accumulator result variables and extends all local result merge implementations accordingly. For += result merge w/o compare, we forward the core operation to inplace binary plus operations, while for result merge w/ compare, we extend the existing implementation. Specifically, we add currentValue + (newValue - oldValue) to avoid double counting. Furthermore, this also includes new tests for parfor with different types of += result merge. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/47498514 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/47498514 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/47498514 Branch: refs/heads/master Commit: 474985143589e363aa58d215fce6abdc8d721bd0 Parents: c2dd05e Author: Matthias Boehm <[email protected]> Authored: Sat Jan 27 21:49:46 2018 -0800 Committer: Matthias Boehm <[email protected]> Committed: Sat Jan 27 21:49:46 2018 -0800 ---------------------------------------------------------------------- .../sysml/parser/ParForStatementBlock.java | 67 +++++++---- .../controlprogram/ParForProgramBlock.java | 61 +++++----- .../controlprogram/parfor/ParForBody.java | 45 +++---- .../controlprogram/parfor/ParWorker.java | 42 +++---- .../controlprogram/parfor/ProgramConverter.java | 40 +++++-- .../parfor/RemoteDPParForSparkWorker.java | 2 +- .../parfor/RemoteDPParWorkerReducer.java | 10 +- .../parfor/RemoteParForSparkWorker.java | 6 +- .../parfor/RemoteParForUtils.java | 37 +++--- .../parfor/RemoteParWorkerMapper.java | 6 +- .../controlprogram/parfor/ResultMerge.java | 45 ++++--- .../parfor/ResultMergeLocalAutomatic.java | 12 +- .../parfor/ResultMergeLocalFile.java | 13 +- .../parfor/ResultMergeLocalMemory.java | 4 +- .../parfor/ResultMergeRemoteMR.java | 10 +- .../parfor/ResultMergeRemoteSpark.java | 21 ++-- .../parfor/opt/OptimizerConstrained.java | 5 +- .../parfor/opt/OptimizerRuleBased.java | 74 ++++++------ .../runtime/instructions/InstructionUtils.java | 3 +- .../sysml/runtime/matrix/data/MatrixBlock.java | 6 - .../ParForAccumulatorResultMergeTest.java | 119 +++++++++++++++++++ .../functions/parfor/parfor_accumulator1.R | 39 ++++++ .../functions/parfor/parfor_accumulator1.dml | 34 ++++++ .../functions/parfor/ZPackageSuite.java | 1 + 24 files changed, 449 insertions(+), 253 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java b/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java index 69027a5..6a39c7a 100644 --- a/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java +++ b/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java @@ -93,9 +93,9 @@ public class ParForStatementBlock extends ForStatementBlock //instance members private final long _ID; - private VariableSet _vsParent = null; - private ArrayList<String> _resultVars = null; - private Bounds _bounds = null; + private VariableSet _vsParent = null; + private ArrayList<ResultVar> _resultVars = null; + private Bounds _bounds = null; static { @@ -159,11 +159,15 @@ public class ParForStatementBlock extends ForStatementBlock return _ID; } - public ArrayList<String> getResultVariables() { + public ArrayList<ResultVar> getResultVariables() { return _resultVars; } - private void addToResultVariablesNoDup( String var ) { + private void addToResultVariablesNoDup( String var, boolean accum ) { + addToResultVariablesNoDup(new ResultVar(var, accum)); + } + + private void addToResultVariablesNoDup( ResultVar var ) { if( !_resultVars.contains( var ) ) _resultVars.add( var ); } @@ -344,16 +348,16 @@ public class ParForStatementBlock extends ForStatementBlock //a) add own candidates for( Candidate var : C ) if( check || var._dat.getDataType()!=DataType.SCALAR ) - addToResultVariablesNoDup( var._var ); + addToResultVariablesNoDup( var._var, var._isAccum ); //b) get and add child result vars (if required) - ArrayList<String> tmp = new ArrayList<>(); + ArrayList<ResultVar> tmp = new ArrayList<>(); rConsolidateResultVars(pfs.getBody(), tmp); - for( String var : tmp ) - if(_vsParent.containsVariable(var)) - addToResultVariablesNoDup( var ); + for( ResultVar var : tmp ) + if(_vsParent.containsVariable(var._name)) + addToResultVariablesNoDup(var); if( LDEBUG ) - for( String rvar : _resultVars ) - LOG.debug("INFO: PARFOR final result variable: "+rvar); + for( ResultVar rvar : _resultVars ) + LOG.debug("INFO: PARFOR final result variable: "+rvar._name); //cleanup function cache in order to prevent side effects between parfor statements if( USE_FN_CACHE ) @@ -594,35 +598,25 @@ public class ParForStatementBlock extends ForStatementBlock && l2.eval(1L) == l1._b[0] ); //aligned intercept } - private void rConsolidateResultVars(ArrayList<StatementBlock> asb, ArrayList<String> vars) + private void rConsolidateResultVars(ArrayList<StatementBlock> asb, ArrayList<ResultVar> vars) throws LanguageException { for(StatementBlock sb : asb ) // foreach statementblock in parforbody { if( sb instanceof ParForStatementBlock ) - { vars.addAll(((ParForStatementBlock)sb).getResultVariables()); - } - for( Statement s : sb._statements ) // foreach statement in statement block - { + for( Statement s : sb._statements ) { if( s instanceof ForStatement || s instanceof ParForStatement ) - { rConsolidateResultVars(((ForStatement)s).getBody(), vars); - } else if( s instanceof WhileStatement ) - { rConsolidateResultVars(((WhileStatement)s).getBody(), vars); - } - else if( s instanceof IfStatement ) - { + else if( s instanceof IfStatement ) { rConsolidateResultVars(((IfStatement)s).getIfBody(), vars); rConsolidateResultVars(((IfStatement)s).getElseBody(), vars); } else if( s instanceof FunctionStatement ) - { rConsolidateResultVars(((FunctionStatement)s).getBody(), vars); - } } } } @@ -1809,6 +1803,29 @@ public class ParForStatementBlock extends ForStatementBlock return ret; } + public static class ResultVar { + public final String _name; + public final boolean _isAccum; + public ResultVar(String name, boolean accum) { + _name = name; + _isAccum = accum; + } + @Override + public boolean equals(Object that) { + if( !(that instanceof ResultVar) ) + return false; + return _name.equals(((ResultVar)that)._name); + } + @Override + public int hashCode() { + return _name.hashCode(); + } + @Override + public String toString() { + return _name; + } + } + private static class Candidate { private final String _var; // variable name private final DataIdentifier _dat; // _var data identifier http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java index 88ce415..7348266 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.hadoop.fs.FileSystem; @@ -49,6 +50,7 @@ import org.apache.sysml.parser.DataIdentifier; import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.parser.ParForStatementBlock; +import org.apache.sysml.parser.ParForStatementBlock.ResultVar; import org.apache.sysml.parser.StatementBlock; import org.apache.sysml.parser.VariableSet; import org.apache.sysml.runtime.DMLRuntimeException; @@ -355,7 +357,7 @@ public class ParForProgramBlock extends ForProgramBlock protected Collection<String> _variablesECache = null; // program block meta data - protected final ArrayList<String> _resultVars; + protected final ArrayList<ResultVar> _resultVars; protected final IDSequence _resultVarsIDSeq; protected final IDSequence _dpVarsIDSeq; protected final boolean _hasFunctions; @@ -368,7 +370,7 @@ public class ParForProgramBlock extends ForProgramBlock protected HashMap<Long,ArrayList<ProgramBlock>> _pbcache = null; protected long[] _pwIDs = null; - public ParForProgramBlock(Program prog, String iterPredVar, HashMap<String,String> params, ArrayList<String> resultVars) + public ParForProgramBlock(Program prog, String iterPredVar, HashMap<String,String> params, ArrayList<ResultVar> resultVars) throws DMLRuntimeException { this( -1, prog, iterPredVar, params, resultVars); @@ -385,7 +387,7 @@ public class ParForProgramBlock extends ForProgramBlock * @param resultVars list of result variable names * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public ParForProgramBlock(int ID, Program prog, String iterPredVar, HashMap<String,String> params, ArrayList<String> resultVars) + public ParForProgramBlock(int ID, Program prog, String iterPredVar, HashMap<String,String> params, ArrayList<ResultVar> resultVars) { super(prog, iterPredVar); @@ -457,7 +459,7 @@ public class ParForProgramBlock extends ForProgramBlock UtilFunctions.unquote(tmp).toUpperCase(); } - public ArrayList<String> getResultVariables() { + public ArrayList<ResultVar> getResultVariables() { return _resultVars; } @@ -810,7 +812,8 @@ public class ParForProgramBlock extends ForProgramBlock LocalVariableMap [] localVariables = new LocalVariableMap [_numThreads]; for( int i=0; i<_numThreads; i++ ) { localVariables[i] = workers[i].getVariables(); - localVariables[i].removeAllNotIn(new HashSet<>(_resultVars)); + localVariables[i].removeAllNotIn(_resultVars.stream() + .map(v -> v._name).collect(Collectors.toSet())); numExecutedTasks += workers[i].getExecutedTasks(); numExecutedIterations += workers[i].getExecutedIterations(); } @@ -1489,7 +1492,7 @@ public class ParForProgramBlock extends ForProgramBlock return dp; } - private ResultMerge createResultMerge( PResultMerge prm, MatrixObject out, MatrixObject[] in, String fname, ExecutionContext ec ) + private ResultMerge createResultMerge( PResultMerge prm, MatrixObject out, MatrixObject[] in, String fname, boolean accum, ExecutionContext ec ) throws DMLRuntimeException { ResultMerge rm = null; @@ -1518,21 +1521,22 @@ public class ParForProgramBlock extends ForProgramBlock switch( prm ) { case LOCAL_MEM: - rm = new ResultMergeLocalMemory( out, in, fname ); + rm = new ResultMergeLocalMemory( out, in, fname, accum ); break; case LOCAL_FILE: - rm = new ResultMergeLocalFile( out, in, fname ); + rm = new ResultMergeLocalFile( out, in, fname, accum ); break; case LOCAL_AUTOMATIC: - rm = new ResultMergeLocalAutomatic( out, in, fname ); + rm = new ResultMergeLocalAutomatic( out, in, fname, accum ); break; case REMOTE_MR: - rm = new ResultMergeRemoteMR( out, in, fname, + rm = new ResultMergeRemoteMR( out, in, fname, accum, _ID, numMap, numRed, WRITE_REPLICATION_FACTOR, MAX_RETRYS_ON_ERROR, ALLOW_REUSE_MR_JVMS ); break; case REMOTE_SPARK: - rm = new ResultMergeRemoteSpark( out, in, fname, ec, numMap, numRed ); + rm = new ResultMergeRemoteSpark( out, in, + fname, accum, ec, numMap, numRed ); break; default: @@ -1657,10 +1661,11 @@ public class ParForProgramBlock extends ForProgramBlock try { //enqueue all result vars as tasks - LocalTaskQueue<String> q = new LocalTaskQueue<>(); - for( String var : _resultVars ) //foreach non-local write - if( ec.getVariable(var) instanceof MatrixObject ) //robustness scalars + LocalTaskQueue<ResultVar> q = new LocalTaskQueue<>(); + for( ResultVar var : _resultVars ) { //foreach non-local write + if( ec.getVariable(var._name) instanceof MatrixObject ) //robustness scalars q.enqueueTask(var); + } q.closeInput(); //run result merge workers @@ -1683,17 +1688,17 @@ public class ParForProgramBlock extends ForProgramBlock else { //execute result merge sequentially for all result vars - for( String var : _resultVars ) //foreach non-local write + for( ResultVar var : _resultVars ) //foreach non-local write { - Data dat = ec.getVariable(var); + Data dat = ec.getVariable(var._name); if( dat instanceof MatrixObject ) //robustness scalars { MatrixObject out = (MatrixObject) dat; MatrixObject[] in = new MatrixObject[ results.length ]; for( int i=0; i< results.length; i++ ) - in[i] = (MatrixObject) results[i].get( var ); + in[i] = (MatrixObject) results[i].get( var._name ); String fname = constructResultMergeFileName(); - ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, ec); + ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, var._isAccum, ec); MatrixObject outNew = null; if( USE_PARALLEL_RESULT_MERGE ) outNew = rm.executeParallelMerge( _numThreads ); @@ -1701,7 +1706,7 @@ public class ParForProgramBlock extends ForProgramBlock outNew = rm.executeSerialMerge(); //cleanup existing var - Data exdata = ec.removeVariable(var); + Data exdata = ec.removeVariable(var._name); if( exdata != null && exdata != outNew && exdata instanceof MatrixObject ) ec.cleanupCacheableData((MatrixObject)exdata); @@ -1709,7 +1714,7 @@ public class ParForProgramBlock extends ForProgramBlock cleanWorkerResultVariables( ec, out, in ); //set merged result variable - ec.setVariable(var, outNew); + ec.setVariable(var._name, outNew); } } } @@ -1907,12 +1912,12 @@ public class ParForProgramBlock extends ForProgramBlock */ private class ResultMergeWorker extends Thread { - private LocalTaskQueue<String> _q = null; + private LocalTaskQueue<ResultVar> _q = null; private LocalVariableMap[] _refVars = null; private ExecutionContext _ec = null; private boolean _success = false; - public ResultMergeWorker( LocalTaskQueue<String> q, LocalVariableMap[] results, ExecutionContext ec ) + public ResultMergeWorker( LocalTaskQueue<ResultVar> q, LocalVariableMap[] results, ExecutionContext ec ) { _q = q; _refVars = results; @@ -1930,21 +1935,21 @@ public class ParForProgramBlock extends ForProgramBlock { while( true ) { - String varname = _q.dequeueTask(); - if( varname == LocalTaskQueue.NO_MORE_TASKS ) // task queue closed (no more tasks) + ResultVar var = _q.dequeueTask(); + if( var == LocalTaskQueue.NO_MORE_TASKS ) // task queue closed (no more tasks) break; MatrixObject out = null; synchronized( _ec.getVariables() ){ - out = _ec.getMatrixObject(varname); + out = _ec.getMatrixObject(var._name); } MatrixObject[] in = new MatrixObject[ _refVars.length ]; for( int i=0; i< _refVars.length; i++ ) - in[i] = (MatrixObject) _refVars[i].get( varname ); + in[i] = (MatrixObject) _refVars[i].get( var._name ); String fname = constructResultMergeFileName(); - ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, _ec); + ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, var._isAccum, _ec); MatrixObject outNew = null; if( USE_PARALLEL_RESULT_MERGE ) outNew = rm.executeParallelMerge( _numThreads ); @@ -1952,7 +1957,7 @@ public class ParForProgramBlock extends ForProgramBlock outNew = rm.executeSerialMerge(); synchronized( _ec.getVariables() ){ - _ec.getVariables().put( varname, outNew); + _ec.getVariables().put( var._name, outNew); } //cleanup of intermediate result variables http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParForBody.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParForBody.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParForBody.java index 2d78895..3ff9e74 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParForBody.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParForBody.java @@ -21,6 +21,7 @@ package org.apache.sysml.runtime.controlprogram.parfor; import java.util.ArrayList; +import org.apache.sysml.parser.ParForStatementBlock.ResultVar; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.ProgramBlock; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; @@ -31,57 +32,45 @@ import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; */ public class ParForBody { - - - private ArrayList<String> _resultVarNames; + private ArrayList<ResultVar> _resultVars; private ArrayList<ProgramBlock> _childBlocks; - private ExecutionContext _ec; + private ExecutionContext _ec; - public ParForBody() - { - - } + public ParForBody() {} public ParForBody( ArrayList<ProgramBlock> childBlocks, - ArrayList<String> resultVarNames, ExecutionContext ec ) + ArrayList<ResultVar> resultVars, ExecutionContext ec ) { - _resultVarNames = resultVarNames; - _childBlocks = childBlocks; - _ec = ec; + _resultVars = resultVars; + _childBlocks = childBlocks; + _ec = ec; } - public LocalVariableMap getVariables() - { + public LocalVariableMap getVariables() { return _ec.getVariables(); } - public ArrayList<String> getResultVarNames() - { - return _resultVarNames; + public ArrayList<ResultVar> getResultVariables() { + return _resultVars; } - public void setResultVarNames(ArrayList<String> resultVarNames) - { - _resultVarNames = resultVarNames; + public void setResultVariables(ArrayList<ResultVar> resultVars) { + _resultVars = resultVars; } - public ArrayList<ProgramBlock> getChildBlocks() - { + public ArrayList<ProgramBlock> getChildBlocks() { return _childBlocks; } - public void setChildBlocks(ArrayList<ProgramBlock> childBlocks) - { + public void setChildBlocks(ArrayList<ProgramBlock> childBlocks) { _childBlocks = childBlocks; } - public ExecutionContext getEc() - { + public ExecutionContext getEc() { return _ec; } - public void setEc(ExecutionContext ec) - { + public void setEc(ExecutionContext ec) { _ec = ec; } } http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java index 281ce07..cf5cbcf 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java @@ -24,7 +24,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - +import org.apache.sysml.parser.ParForStatementBlock.ResultVar; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.ProgramBlock; @@ -43,11 +43,9 @@ import org.apache.sysml.runtime.instructions.cp.IntObject; */ public abstract class ParWorker { - protected static final Log LOG = LogFactory.getLog(ParWorker.class.getName()); protected long _workerID = -1; - protected ArrayList<ProgramBlock> _childBlocks = null; public ExecutionContext getExecutionContext() { @@ -55,34 +53,28 @@ public abstract class ParWorker } protected ExecutionContext _ec = null; - protected ArrayList<String> _resultVars = null; + protected ArrayList<ResultVar> _resultVars = null; protected boolean _monitor = false; protected long _numTasks = -1; protected long _numIters = -1; - public ParWorker() - { + public ParWorker() { //implicit constructor (required if parameters not known on object creation, //e.g., RemoteParWorkerMapper) } - public ParWorker( long ID, ParForBody body, boolean monitor ) - { + public ParWorker( long ID, ParForBody body, boolean monitor ) { _workerID = ID; - - if( body != null ) - { + if( body != null ) { _childBlocks = body.getChildBlocks(); - _ec = body.getEc(); - _resultVars = body.getResultVarNames(); + _ec = body.getEc(); + _resultVars = body.getResultVariables(); } - _monitor = monitor; - - _numTasks = 0; - _numIters = 0; + _numTasks = 0; + _numIters = 0; } public LocalVariableMap getVariables() @@ -107,21 +99,15 @@ public abstract class ParWorker * * @return number of executed iterations */ - public long getExecutedIterations() - { + public long getExecutedIterations() { return _numIters; } - protected void pinResultVariables() - { - for( String var : _resultVars ) - { - Data dat = _ec.getVariable(var); + protected void pinResultVariables() { + for( ResultVar var : _resultVars ) { + Data dat = _ec.getVariable(var._name); if( dat instanceof MatrixObject ) - { - MatrixObject mo = (MatrixObject)dat; - mo.enableCleanup(false); - } + ((MatrixObject)dat).enableCleanup(false); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java index ea3deb5..d5cd518 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java @@ -45,6 +45,7 @@ import org.apache.sysml.parser.ParForStatementBlock; import org.apache.sysml.parser.StatementBlock; import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; +import org.apache.sysml.parser.ParForStatementBlock.ResultVar; import org.apache.sysml.parser.WhileStatementBlock; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.codegen.CodegenUtils; @@ -726,7 +727,7 @@ public class ProgramConverter throws DMLRuntimeException { ArrayList<ProgramBlock> pbs = body.getChildBlocks(); - ArrayList<String> rVnames = body.getResultVarNames(); + ArrayList<ResultVar> rVnames = body.getResultVariables(); ExecutionContext ec = body.getEc(); if( pbs.isEmpty() ) @@ -741,7 +742,7 @@ public class ProgramConverter //handle DMLScript UUID (propagate original uuid for writing to scratch space) sb.append( DMLScript.getUUID() ); sb.append( COMPONENTS_DELIM ); - sb.append( NEWLINE ); + sb.append( NEWLINE ); //handle DML config sb.append( ConfigurationManager.getDMLConfig().serializeDMLConfig() ); @@ -763,7 +764,7 @@ public class ProgramConverter sb.append( NEWLINE ); //handle result variable names - sb.append( serializeStringArrayList(rVnames) ); + sb.append( serializeResultVariables(rVnames) ); sb.append( COMPONENTS_DELIM ); //handle execution context @@ -1024,6 +1025,18 @@ public class ProgramConverter return sb.toString(); } + public static String serializeResultVariables( ArrayList<ResultVar> vars) { + StringBuilder sb = new StringBuilder(); + int count=0; + for( ResultVar var : vars ) { + if(count>0) + sb.append( ELEMENT_DELIM ); + sb.append( var._isAccum ? var._name+"+" : var._name ); + count++; + } + return sb.toString(); + } + public static String serializeStringArrayList( ArrayList<String> vars) { StringBuilder sb = new StringBuilder(); @@ -1179,7 +1192,7 @@ public class ProgramConverter sb.append( pfpb.getIterVar() ); sb.append( COMPONENTS_DELIM ); - sb.append( serializeStringArrayList( pfpb.getResultVariables()) ); + sb.append( serializeResultVariables( pfpb.getResultVariables()) ); sb.append( COMPONENTS_DELIM ); sb.append( serializeStringHashMap( pfpb.getParForParams()) ); //parameters of nested parfor sb.append( COMPONENTS_DELIM ); @@ -1323,13 +1336,13 @@ public class ProgramConverter //handle result variable names String rvarStr = st.nextToken(); - ArrayList<String> rvars = parseStringArrayList(rvarStr); - body.setResultVarNames(rvars); + ArrayList<ResultVar> rvars = parseResultVariables(rvarStr); + body.setResultVariables(rvars); //handle execution context String ecStr = st.nextToken(); ExecutionContext ec = parseExecutionContext( ecStr, prog ); - + //handle program blocks String spbs = st.nextToken(); ArrayList<ProgramBlock> pbs = rParseProgramBlocks(spbs, prog, id); @@ -1337,7 +1350,7 @@ public class ProgramConverter body.setChildBlocks( pbs ); body.setEc( ec ); - return body; + return body; } public static Program parseProgram( String in, int id ) @@ -1499,7 +1512,7 @@ public class ProgramConverter //inputs String iterVar = st.nextToken(); - ArrayList<String> resultVars = parseStringArrayList(st.nextToken()); + ArrayList<ResultVar> resultVars = parseResultVariables(st.nextToken()); HashMap<String,String> params = parseStringHashMap(st.nextToken()); //instructions @@ -1639,6 +1652,15 @@ public class ProgramConverter } return insts; } + + private static ArrayList<ResultVar> parseResultVariables(String in) { + ArrayList<ResultVar> ret = new ArrayList<>(); + for(String var : parseStringArrayList(in)) { + boolean accum = var.endsWith("+"); + ret.add(new ResultVar(accum ? var.substring(0, var.length()-1) : var, accum)); + } + return ret; + } private static HashMap<String,String> parseStringHashMap( String in ) { HashMap<String,String> vars = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java index 06be2b1..52ac922 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java @@ -154,7 +154,7 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF ParForBody body = ProgramConverter.parseParForBody(_prog, (int)_workerID); _childBlocks = body.getChildBlocks(); _ec = body.getEc(); - _resultVars = body.getResultVarNames(); + _resultVars = body.getResultVariables(); _numTasks = 0; _numIters = 0; http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java index 4c05791..6086d25 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java @@ -154,8 +154,8 @@ public class RemoteDPParWorkerReducer extends ParWorker String in = MRJobConfiguration.getProgramBlocks(job); ParForBody body = ProgramConverter.parseParForBody(in, (int)_workerID); _childBlocks = body.getChildBlocks(); - _ec = body.getEc(); - _resultVars = body.getResultVarNames(); + _ec = body.getEc(); + _resultVars = body.getResultVariables(); //init local cache manager if( !CacheableData.isCachingActive() ) { @@ -176,7 +176,7 @@ public class RemoteDPParWorkerReducer extends ParWorker CacheableData.disableCaching(); _numTasks = 0; - _numIters = 0; + _numIters = 0; } catch(Exception ex) { @@ -192,8 +192,8 @@ public class RemoteDPParWorkerReducer extends ParWorker } @Override - public void close() - throws IOException + public void close() + throws IOException { try { http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java index 52a19f0..41dc53c 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map.Entry; +import java.util.stream.Collectors; import org.apache.spark.TaskContext; import org.apache.spark.api.java.function.PairFlatMapFunction; @@ -107,14 +108,15 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun ParForBody body = ProgramConverter.parseParForBody(_prog, (int)_workerID); _childBlocks = body.getChildBlocks(); _ec = body.getEc(); - _resultVars = body.getResultVarNames(); + _resultVars = body.getResultVariables(); _numTasks = 0; _numIters = 0; //reuse shared inputs (to read shared inputs once per process instead of once per core; //we reuse everything except result variables and partitioned input matrices) _ec.pinVariables(_ec.getVarList()); //avoid cleanup of shared inputs - Collection<String> blacklist = UtilFunctions.asSet(_resultVars, _ec.getVarListPartitioned()); + Collection<String> blacklist = UtilFunctions.asSet(_resultVars.stream() + .map(v -> v._name).collect(Collectors.toList()), _ec.getVarListPartitioned()); reuseVars.reuseVariables(_jobid, _ec.getVariables(), blacklist); //init and register-cleanup of buffer pool (in parfor spark, multiple tasks might http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java index 30c57b8..7c24ae5 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java @@ -37,6 +37,7 @@ import scala.Tuple2; import org.apache.sysml.api.DMLScript; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.parser.Expression.DataType; +import org.apache.sysml.parser.ParForStatementBlock.ResultVar; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock; @@ -89,11 +90,11 @@ public class RemoteParForUtils } } - public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars, OutputCollector<Writable, Writable> out ) + public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<ResultVar> resultVars, OutputCollector<Writable, Writable> out ) throws DMLRuntimeException, IOException { exportResultVariables(workerID, vars, resultVars, null, out); - } + } /** * For remote MR parfor workers. @@ -106,7 +107,7 @@ public class RemoteParForUtils * @throws DMLRuntimeException if DMLRuntimeException occurs * @throws IOException if IOException occurs */ - public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars, + public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<ResultVar> resultVars, HashMap<String,String> rvarFnames, OutputCollector<Writable, Writable> out ) throws DMLRuntimeException, IOException { @@ -115,9 +116,9 @@ public class RemoteParForUtils Text ovalue = new Text(); //foreach result variables probe if export necessary - for( String rvar : resultVars ) + for( ResultVar rvar : resultVars ) { - Data dat = vars.get( rvar ); + Data dat = vars.get( rvar._name ); //export output variable to HDFS (see RunMRJobs) if ( dat != null && dat.getDataType() == DataType.MATRIX ) @@ -127,13 +128,13 @@ public class RemoteParForUtils { if( ParForProgramBlock.ALLOW_REUSE_MR_PAR_WORKER && rvarFnames!=null ) { - String fname = rvarFnames.get( rvar ); + String fname = rvarFnames.get( rvar._name ); if( fname!=null ) mo.setFileName( fname ); //export result var (iff actually modified in parfor) mo.exportData(); //note: this is equivalent to doing it in close (currently not required because 1 Task=1Map tasks, hence only one map invocation) - rvarFnames.put(rvar, mo.getFileName()); + rvarFnames.put(rvar._name, mo.getFileName()); } else { @@ -143,7 +144,7 @@ public class RemoteParForUtils //pass output vars (scalars by value, matrix by ref) to result //(only if actually exported, hence in check for dirty, otherwise potential problems in result merge) - String datStr = ProgramConverter.serializeDataObject(rvar, mo); + String datStr = ProgramConverter.serializeDataObject(rvar._name, mo); ovalue.set( datStr ); out.collect( okey, ovalue ); } @@ -161,31 +162,25 @@ public class RemoteParForUtils * @throws DMLRuntimeException if DMLRuntimeException occurs * @throws IOException if IOException occurs */ - public static ArrayList<String> exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars) + public static ArrayList<String> exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<ResultVar> resultVars) throws DMLRuntimeException, IOException { ArrayList<String> ret = new ArrayList<>(); //foreach result variables probe if export necessary - for( String rvar : resultVars ) - { - Data dat = vars.get( rvar ); - + for( ResultVar rvar : resultVars ) { + Data dat = vars.get( rvar._name ); //export output variable to HDFS (see RunMRJobs) - if ( dat != null && dat.getDataType() == DataType.MATRIX ) - { + if ( dat != null && dat.getDataType() == DataType.MATRIX ) { MatrixObject mo = (MatrixObject) dat; - if( mo.isDirty() ) - { + if( mo.isDirty() ) { //export result var (iff actually modified in parfor) mo.exportData(); - - //pass output vars (scalars by value, matrix by ref) to result //(only if actually exported, hence in check for dirty, otherwise potential problems in result merge) - ret.add( ProgramConverter.serializeDataObject(rvar, mo) ); + ret.add( ProgramConverter.serializeDataObject(rvar._name, mo) ); } - } + } } return ret; http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java index 730adbd..7029061 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java @@ -164,15 +164,15 @@ public class RemoteParWorkerMapper extends ParWorker //MapReduceBase not requir String in = MRJobConfiguration.getProgramBlocks(job); ParForBody body = ProgramConverter.parseParForBody(in, (int)_workerID); _childBlocks = body.getChildBlocks(); - _ec = body.getEc(); - _resultVars = body.getResultVarNames(); + _ec = body.getEc(); + _resultVars = body.getResultVariables(); //init local cache manager if( !CacheableData.isCachingActive() ) { String uuid = IDHandler.createDistributedUniqueID(); LocalFileUtils.createWorkingDirectoryWithUUID( uuid ); CacheableData.initCaching( uuid ); //incl activation, cache dir creation (each map task gets its own dir for simplified cleanup) - } + } if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for local mode CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; } http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java index ca7aad6..5e814fa 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java @@ -26,8 +26,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.matrix.data.DenseBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.operators.BinaryOperator; /** * Due to independence of all iterations, any result has the following properties: @@ -38,22 +40,24 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; public abstract class ResultMerge { protected static final Log LOG = LogFactory.getLog(ResultMerge.class.getName()); - protected static final String NAME_SUFFIX = "_rm"; + protected static final BinaryOperator PLUS = InstructionUtils.parseBinaryOperator("+"); //inputs to result merge protected MatrixObject _output = null; protected MatrixObject[] _inputs = null; protected String _outputFName = null; + protected boolean _isAccum = false; protected ResultMerge( ) { //do nothing } - public ResultMerge( MatrixObject out, MatrixObject[] in, String outputFilename ) { + public ResultMerge( MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum ) { _output = out; _inputs = in; _outputFName = outputFilename; + _isAccum = accum; } /** @@ -89,7 +93,10 @@ public abstract class ResultMerge */ protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly ) throws DMLRuntimeException { //pass through to matrix block operations - out.merge(in, appendOnly); + if( _isAccum ) + out.binaryOperationsInPlace(PLUS, in); + else + out.merge(in, appendOnly); } /** @@ -109,44 +116,32 @@ public abstract class ResultMerge // (using sparse iterator would miss values set to 0) // * Explicit NaN awareness because for cases were original matrix contains // NaNs, since NaN != NaN, otherwise we would potentially overwrite results + // * For the case of accumulation, we add out += (new-old) to ensure correct results + // because all inputs have the old values replicated if( in.isEmptyBlock(false) ) { + if( _isAccum ) return; //nothing to do for( int i=0; i<in.getNumRows(); i++ ) for( int j=0; j<in.getNumColumns(); j++ ) if( compare.get(i, j) != 0 ) out.quickSetValue(i, j, 0); } - else if( in.isInSparseFormat() ) { //SPARSE + else { //SPARSE/DENSE int rows = in.getNumRows(); int cols = in.getNumColumns(); for( int i=0; i<rows; i++ ) for( int j=0; j<cols; j++ ) { - double value = in.getValueSparseUnsafe(i,j); //input value - if( (value != compare.get(i,j) && !Double.isNaN(value) ) //for new values only (div) - || Double.isNaN(value) != Double.isNaN(compare.get(i,j)) ) //NaN awareness + double valOld = compare.get(i,j); + double valNew = in.quickGetValue(i,j); //input value + if( (valNew != valOld && !Double.isNaN(valNew) ) //for changed values + || Double.isNaN(valNew) != Double.isNaN(valOld) ) //NaN awareness { + double value = !_isAccum ? valNew : + (out.quickGetValue(i, j) + (valNew - valOld)); out.quickSetValue(i, j, value); } } } - else { //DENSE - //guaranteed allocated due to empty case above - DenseBlock a = in.getDenseBlock(); - int rows = in.getNumRows(); - int cols = in.getNumColumns(); - for( int i=0; i<rows; i++ ) { - double[] avals = a.values(i); - int aix = a.pos(i); - for( int j=0; j<cols; j++ ) { - double value = avals[aix+j]; //input value - if( (value != compare.get(i,j) && !Double.isNaN(value) ) //for new values only (div) - || Double.isNaN(value) != Double.isNaN(compare.get(i,j)) ) //NaN awareness - { - out.quickSetValue( i, j, value ); - } - } - } - } } protected long computeNonZeros( MatrixObject out, List<MatrixObject> in ) { http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java index fec05c7..e9d178a 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java @@ -31,8 +31,8 @@ public class ResultMergeLocalAutomatic extends ResultMerge { private ResultMerge _rm = null; - public ResultMergeLocalAutomatic( MatrixObject out, MatrixObject[] in, String outputFilename ) { - super( out, in, outputFilename ); + public ResultMergeLocalAutomatic( MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum ) { + super( out, in, outputFilename, accum ); } @Override @@ -46,9 +46,9 @@ public class ResultMergeLocalAutomatic extends ResultMerge long cols = mc.getCols(); if( OptimizerRuleBased.isInMemoryResultMerge(rows, cols, OptimizerUtils.getLocalMemBudget()) ) - _rm = new ResultMergeLocalMemory( _output, _inputs, _outputFName ); + _rm = new ResultMergeLocalMemory( _output, _inputs, _outputFName, _isAccum ); else - _rm = new ResultMergeLocalFile( _output, _inputs, _outputFName ); + _rm = new ResultMergeLocalFile( _output, _inputs, _outputFName, _isAccum ); MatrixObject ret = _rm.executeSerialMerge(); @@ -66,9 +66,9 @@ public class ResultMergeLocalAutomatic extends ResultMerge long cols = mc.getCols(); if( OptimizerRuleBased.isInMemoryResultMerge(par * rows, cols, OptimizerUtils.getLocalMemBudget()) ) - _rm = new ResultMergeLocalMemory( _output, _inputs, _outputFName ); + _rm = new ResultMergeLocalMemory( _output, _inputs, _outputFName, _isAccum ); else - _rm = new ResultMergeLocalFile( _output, _inputs, _outputFName ); + _rm = new ResultMergeLocalFile( _output, _inputs, _outputFName, _isAccum ); return _rm.executeParallelMerge(par); } http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java index af77783..049dbf1 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java @@ -77,9 +77,9 @@ public class ResultMergeLocalFile extends ResultMerge //internal comparison matrix private IDSequence _seq = null; - public ResultMergeLocalFile( MatrixObject out, MatrixObject[] in, String outputFilename ) + public ResultMergeLocalFile( MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum ) { - super( out, in, outputFilename ); + super( out, in, outputFilename, accum ); _seq = new IDSequence(); } @@ -121,7 +121,7 @@ public class ResultMergeLocalFile extends ResultMerge merge( _outputFName, _output, inMO ); //create new output matrix (e.g., to prevent potential export<->read file access conflict - moNew = createNewMatrixObject( _output, inMO ); + moNew = createNewMatrixObject( _output, inMO ); } else { @@ -146,7 +146,7 @@ public class ResultMergeLocalFile extends ResultMerge return executeSerialMerge(); } - private MatrixObject createNewMatrixObject(MatrixObject output, ArrayList<MatrixObject> inMO ) + private MatrixObject createNewMatrixObject(MatrixObject output, ArrayList<MatrixObject> inMO) throws DMLRuntimeException { MetaDataFormat metadata = (MetaDataFormat) _output.getMetaData(); @@ -156,9 +156,8 @@ public class ResultMergeLocalFile extends ResultMerge MatrixCharacteristics mcOld = metadata.getMatrixCharacteristics(); OutputInfo oiOld = metadata.getOutputInfo(); InputInfo iiOld = metadata.getInputInfo(); - MatrixCharacteristics mc = new MatrixCharacteristics(mcOld.getRows(),mcOld.getCols(), - mcOld.getRowsPerBlock(),mcOld.getColsPerBlock()); - mc.setNonZeros( computeNonZeros(output, inMO) ); + MatrixCharacteristics mc = new MatrixCharacteristics(mcOld); + mc.setNonZeros(_isAccum ? -1 : computeNonZeros(output, inMO)); MetaDataFormat meta = new MetaDataFormat(mc,oiOld,iiOld); moNew.setMetaData( meta ); http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java index 991baca..a59e81c 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java @@ -45,8 +45,8 @@ public class ResultMergeLocalMemory extends ResultMerge //internal comparison matrix private DenseBlock _compare = null; - public ResultMergeLocalMemory( MatrixObject out, MatrixObject[] in, String outputFilename ) { - super( out, in, outputFilename ); + public ResultMergeLocalMemory( MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum ) { + super( out, in, outputFilename, accum ); } @Override http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java index 7ea1543..4b1bb0e 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java @@ -70,9 +70,10 @@ public class ResultMergeRemoteMR extends ResultMerge //private int _max_retry = -1; private boolean _jvmReuse = false; - public ResultMergeRemoteMR(MatrixObject out, MatrixObject[] in, String outputFilename, long pfid, int numMappers, int numReducers, int replication, int max_retry, boolean jvmReuse) + public ResultMergeRemoteMR(MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum, + long pfid, int numMappers, int numReducers, int replication, int max_retry, boolean jvmReuse) { - super(out, in, outputFilename); + super(out, in, outputFilename, accum); _pfid = pfid; _numMappers = numMappers; @@ -136,9 +137,8 @@ public class ResultMergeRemoteMR extends ResultMerge moNew = new MatrixObject(_output.getValueType(), _outputFName); OutputInfo oiOld = metadata.getOutputInfo(); InputInfo iiOld = metadata.getInputInfo(); - MatrixCharacteristics mc = new MatrixCharacteristics(mcOld.getRows(),mcOld.getCols(), - mcOld.getRowsPerBlock(),mcOld.getColsPerBlock()); - mc.setNonZeros( computeNonZeros(_output, inMO) ); + MatrixCharacteristics mc = new MatrixCharacteristics(mcOld); + mc.setNonZeros(_isAccum ? -1 : computeNonZeros(_output, inMO)); MetaDataFormat meta = new MetaDataFormat(mc,oiOld,iiOld); moNew.setMetaData( meta ); } http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java index 2b64bb2..804e490 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java @@ -54,9 +54,10 @@ public class ResultMergeRemoteSpark extends ResultMerge private int _numMappers = -1; private int _numReducers = -1; - public ResultMergeRemoteSpark(MatrixObject out, MatrixObject[] in, String outputFilename, ExecutionContext ec, int numMappers, int numReducers) + public ResultMergeRemoteSpark(MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum, + ExecutionContext ec, int numMappers, int numReducers) { - super(out, in, outputFilename); + super(out, in, outputFilename, accum); _ec = ec; _numMappers = numMappers; @@ -83,8 +84,7 @@ public class ResultMergeRemoteSpark extends ResultMerge try { - if( _inputs != null && _inputs.length>0 ) - { + if( _inputs != null && _inputs.length>0 ) { //prepare compare MetaDataFormat metadata = (MetaDataFormat) _output.getMetaData(); MatrixCharacteristics mcOld = metadata.getMatrixCharacteristics(); @@ -97,24 +97,21 @@ public class ResultMergeRemoteSpark extends ResultMerge moNew = new MatrixObject(_output.getValueType(), _outputFName); OutputInfo oiOld = metadata.getOutputInfo(); InputInfo iiOld = metadata.getInputInfo(); - MatrixCharacteristics mc = new MatrixCharacteristics(mcOld.getRows(),mcOld.getCols(), - mcOld.getRowsPerBlock(),mcOld.getColsPerBlock()); - mc.setNonZeros( computeNonZeros(_output, Arrays.asList(_inputs)) ); + MatrixCharacteristics mc = new MatrixCharacteristics(mcOld); + mc.setNonZeros(_isAccum ? -1 : computeNonZeros(_output, Arrays.asList(_inputs))); MetaDataFormat meta = new MetaDataFormat(mc,oiOld,iiOld); moNew.setMetaData( meta ); moNew.setRDDHandle( ro ); } - else - { + else { moNew = _output; //return old matrix, to prevent copy } } - catch(Exception ex) - { + catch(Exception ex) { throw new DMLRuntimeException(ex); } - return moNew; + return moNew; } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java index f517bda..2a1fd19 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java @@ -26,6 +26,7 @@ import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.lops.LopProperties; import org.apache.sysml.parser.ParForStatementBlock; +import org.apache.sysml.parser.ParForStatementBlock.ResultVar; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock; @@ -167,7 +168,7 @@ public class OptimizerConstrained extends OptimizerRuleBased super.rewriteSetTranposeSparseVectorOperations(pn, partitionedMatrices, ec.getVariables()); //rewrite 14: - HashSet<String> inplaceResultVars = new HashSet<>(); + HashSet<ResultVar> inplaceResultVars = new HashSet<>(); super.rewriteSetInPlaceResultIndexing(pn, M1, ec.getVariables(), inplaceResultVars, ec); //rewrite 15: @@ -183,7 +184,7 @@ public class OptimizerConstrained extends OptimizerRuleBased rewriteSetTaskPartitioner( pn, false, false ); //flagLIX always false // rewrite 14: set in-place result indexing - HashSet<String> inplaceResultVars = new HashSet<>(); + HashSet<ResultVar> inplaceResultVars = new HashSet<>(); super.rewriteSetInPlaceResultIndexing(pn, M1, ec.getVariables(), inplaceResultVars, ec); if( !OptimizerUtils.isSparkExecutionMode() ) { http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java index c9cb833..9fc0dbe 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java @@ -21,6 +21,7 @@ package org.apache.sysml.runtime.controlprogram.parfor.opt; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -62,6 +63,7 @@ import org.apache.sysml.parser.FunctionStatementBlock; import org.apache.sysml.parser.LanguageException; import org.apache.sysml.parser.ParForStatement; import org.apache.sysml.parser.ParForStatementBlock; +import org.apache.sysml.parser.ParForStatementBlock.ResultVar; import org.apache.sysml.parser.StatementBlock; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.ForProgramBlock; @@ -288,7 +290,7 @@ public class OptimizerRuleBased extends Optimizer rewriteSetTranposeSparseVectorOperations(pn, partitionedMatrices, ec.getVariables()); // rewrite 14: set in-place result indexing - HashSet<String> inplaceResultVars = new HashSet<>(); + HashSet<ResultVar> inplaceResultVars = new HashSet<>(); rewriteSetInPlaceResultIndexing(pn, M1, ec.getVariables(), inplaceResultVars, ec); // rewrite 15: disable caching @@ -303,7 +305,7 @@ public class OptimizerRuleBased extends Optimizer rewriteSetTaskPartitioner( pn, false, false ); //flagLIX always false // rewrite 14: set in-place result indexing - HashSet<String> inplaceResultVars = new HashSet<>(); + HashSet<ResultVar> inplaceResultVars = new HashSet<>(); rewriteSetInPlaceResultIndexing(pn, M1, ec.getVariables(), inplaceResultVars, ec); if( !OptimizerUtils.isSparkExecutionMode() ) { @@ -623,7 +625,7 @@ public class OptimizerRuleBased extends Optimizer return apply; } - protected boolean isResultPartitionableAll( Collection<OptNode> nlist, ArrayList<String> resultVars, LocalVariableMap vars, String iterVarname ) + protected boolean isResultPartitionableAll( Collection<OptNode> nlist, ArrayList<ResultVar> resultVars, LocalVariableMap vars, String iterVarname ) throws DMLRuntimeException { boolean ret = true; @@ -637,7 +639,7 @@ public class OptimizerRuleBased extends Optimizer return ret; } - protected boolean isResultPartitionable( OptNode n, ArrayList<String> resultVars, LocalVariableMap vars, String iterVarname ) + protected boolean isResultPartitionable( OptNode n, ArrayList<ResultVar> resultVars, LocalVariableMap vars, String iterVarname ) throws DMLRuntimeException { boolean ret = true; @@ -1663,7 +1665,7 @@ public class OptimizerRuleBased extends Optimizer //REWRITE set in-place result indexing /// - protected void rewriteSetInPlaceResultIndexing(OptNode pn, double M, LocalVariableMap vars, HashSet<String> inPlaceResultVars, ExecutionContext ec) + protected void rewriteSetInPlaceResultIndexing(OptNode pn, double M, LocalVariableMap vars, HashSet<ResultVar> inPlaceResultVars, ExecutionContext ec) throws DMLRuntimeException { //assertions (warnings of corrupt optimizer decisions) @@ -1677,14 +1679,14 @@ public class OptimizerRuleBased extends Optimizer //note currently we decide for all result vars jointly, i.e., //only if all fit pinned in remaining budget, we apply this rewrite. - ArrayList<String> retVars = pfpb.getResultVariables(); + ArrayList<ResultVar> retVars = pfpb.getResultVariables(); //compute total sum of pinned result variable memory double sum = computeTotalSizeResultVariables(retVars, vars, pfpb.getDegreeOfParallelism()); //NOTE: currently this rule is too conservative (the result variable is assumed to be dense and //most importantly counted twice if this is part of the maximum operation) - double totalMem = Math.max((M+sum), rComputeSumMemoryIntermediates(pn, new HashSet<String>())); + double totalMem = Math.max((M+sum), rComputeSumMemoryIntermediates(pn, new HashSet<ResultVar>())); //optimization decision if( rHasOnlyInPlaceSafeLeftIndexing(pn, retVars) ) //basic correctness constraint @@ -1710,8 +1712,8 @@ public class OptimizerRuleBased extends Optimizer { //add result vars to result and set state //will be serialized and transfered via symbol table - for( String var : retVars ){ - Data dat = vars.get(var); + for( ResultVar var : retVars ){ + Data dat = vars.get(var._name); if( dat instanceof MatrixObject ) ((MatrixObject)dat).setUpdateType(UpdateType.INPLACE_PINNED); } @@ -1719,10 +1721,10 @@ public class OptimizerRuleBased extends Optimizer } LOG.debug(getOptMode()+" OPT: rewrite 'set in-place result indexing' - result="+ - apply+" ("+ProgramConverter.serializeStringCollection(inPlaceResultVars)+", M="+toMB(totalMem)+")" ); + apply+" ("+Arrays.toString(inPlaceResultVars.toArray(new ResultVar[0]))+", M="+toMB(totalMem)+")" ); } - protected boolean rHasOnlyInPlaceSafeLeftIndexing( OptNode n, ArrayList<String> retVars ) + protected boolean rHasOnlyInPlaceSafeLeftIndexing( OptNode n, ArrayList<ResultVar> retVars ) throws DMLRuntimeException { boolean ret = true; @@ -1739,10 +1741,10 @@ public class OptimizerRuleBased extends Optimizer return ret; } - private static double computeTotalSizeResultVariables(ArrayList<String> retVars, LocalVariableMap vars, int k) { + private static double computeTotalSizeResultVariables(ArrayList<ResultVar> retVars, LocalVariableMap vars, int k) { double sum = 1; - for( String var : retVars ) { - Data dat = vars.get(var); + for( ResultVar var : retVars ) { + Data dat = vars.get(var._name); if( !(dat instanceof MatrixObject) ) continue; MatrixObject mo = (MatrixObject)dat; @@ -1762,7 +1764,7 @@ public class OptimizerRuleBased extends Optimizer //REWRITE disable CP caching /// - protected void rewriteDisableCPCaching(OptNode pn, HashSet<String> inplaceResultVars, LocalVariableMap vars) + protected void rewriteDisableCPCaching(OptNode pn, HashSet<ResultVar> inplaceResultVars, LocalVariableMap vars) throws DMLRuntimeException { //assertions (warnings of corrupt optimizer decisions) @@ -1784,7 +1786,7 @@ public class OptimizerRuleBased extends Optimizer LOG.debug(getOptMode()+" OPT: rewrite 'disable CP caching' - result="+apply+" (M="+toMB(M_sumInterm)+")" ); } - protected double rComputeSumMemoryIntermediates( OptNode n, HashSet<String> inplaceResultVars ) + protected double rComputeSumMemoryIntermediates( OptNode n, HashSet<ResultVar> inplaceResultVars ) throws DMLRuntimeException { double sum = 0; @@ -2061,24 +2063,24 @@ public class OptimizerRuleBased extends Optimizer ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter .getAbstractPlanMapping().getMappedProg(n.getID())[1]; - ArrayList<String> cleanedVars = new ArrayList<>(); - ArrayList<String> resultVars = pfpb.getResultVariables(); + ArrayList<ResultVar> cleanedVars = new ArrayList<>(); + ArrayList<ResultVar> resultVars = pfpb.getResultVariables(); String itervar = pfpb.getIterVar(); - for( String rvar : resultVars ) { - Data dat = ec.getVariable(rvar); + for( ResultVar rvar : resultVars ) { + Data dat = ec.getVariable(rvar._name); if( dat instanceof MatrixObject && ((MatrixObject)dat).getNnz()!=0 //subject to result merge with compare && n.hasOnlySimpleChilds() //guaranteed no conditional indexing - && rContainsResultFullReplace(n, rvar, itervar, (MatrixObject)dat) //guaranteed full matrix replace + && rContainsResultFullReplace(n, rvar._name, itervar, (MatrixObject)dat) //guaranteed full matrix replace //&& !pfsb.variablesRead().containsVariable(rvar) //never read variable in loop body - && !rIsReadInRightIndexing(n, rvar) //never read variable in loop body + && !rIsReadInRightIndexing(n, rvar._name) //never read variable in loop body && ((MatrixObject)dat).getNumRows()<=Integer.MAX_VALUE && ((MatrixObject)dat).getNumColumns()<=Integer.MAX_VALUE ) { //replace existing matrix object with empty matrix MatrixObject mo = (MatrixObject)dat; ec.cleanupCacheableData(mo); - ec.setMatrixOutput(rvar, new MatrixBlock((int)mo.getNumRows(), (int)mo.getNumColumns(),false), null); + ec.setMatrixOutput(rvar._name, new MatrixBlock((int)mo.getNumRows(), (int)mo.getNumColumns(),false), null); //keep track of cleaned result variables cleanedVars.add(rvar); @@ -2086,7 +2088,8 @@ public class OptimizerRuleBased extends Optimizer } _numEvaluatedPlans++; - LOG.debug(getOptMode()+" OPT: rewrite 'remove unnecessary compare matrix' - result="+(!cleanedVars.isEmpty())+" ("+ProgramConverter.serializeStringCollection(cleanedVars)+")" ); + LOG.debug(getOptMode()+" OPT: rewrite 'remove unnecessary compare matrix' - result="+(!cleanedVars.isEmpty()) + +" ("+ProgramConverter.serializeResultVariables(cleanedVars)+")" ); } protected boolean rContainsResultFullReplace( OptNode n, String resultVar, String iterVarname, MatrixObject mo ) @@ -2231,13 +2234,13 @@ public class OptimizerRuleBased extends Optimizer LOG.debug(getOptMode()+" OPT: rewrite 'set result merge' - result="+ret ); } - protected boolean determineFlagCellFormatWoCompare( ArrayList<String> resultVars, LocalVariableMap vars ) + protected boolean determineFlagCellFormatWoCompare( ArrayList<ResultVar> resultVars, LocalVariableMap vars ) { boolean ret = true; - for( String rVar : resultVars ) + for( ResultVar rVar : resultVars ) { - Data dat = vars.get(rVar); + Data dat = vars.get(rVar._name); if( dat == null || !(dat instanceof MatrixObject) ) { ret = false; @@ -2250,8 +2253,7 @@ public class OptimizerRuleBased extends Optimizer OutputInfo oi = meta.getOutputInfo(); long nnz = meta.getMatrixCharacteristics().getNonZeros(); - if( oi == OutputInfo.BinaryBlockOutputInfo || nnz != 0 ) - { + if( oi == OutputInfo.BinaryBlockOutputInfo || nnz != 0 ) { ret = false; break; } @@ -2261,7 +2263,7 @@ public class OptimizerRuleBased extends Optimizer return ret; } - protected boolean hasResultMRLeftIndexing( OptNode n, ArrayList<String> resultVars, LocalVariableMap vars, boolean checkSize ) + protected boolean hasResultMRLeftIndexing( OptNode n, ArrayList<ResultVar> resultVars, LocalVariableMap vars, boolean checkSize ) throws DMLRuntimeException { boolean ret = false; @@ -2309,7 +2311,7 @@ public class OptimizerRuleBased extends Optimizer * @return true if result sizes larger than local memory budget * @throws DMLRuntimeException if DMLRuntimeException occurs */ - protected boolean hasLargeTotalResults( OptNode pn, ArrayList<String> resultVars, LocalVariableMap vars, boolean checkSize ) + protected boolean hasLargeTotalResults( OptNode pn, ArrayList<ResultVar> resultVars, LocalVariableMap vars, boolean checkSize ) throws DMLRuntimeException { double totalSize = 0; @@ -2319,17 +2321,17 @@ public class OptimizerRuleBased extends Optimizer int k = pn.getK(); long W = estimateNumTasks(tp, _N, k); - for( String var : resultVars ) + for( ResultVar var : resultVars ) { //Potential unknowns: for local result var of child parfor (but we're only interested in top level) //Potential scalars: for disabled dependency analysis and unbounded scoping - Data dat = vars.get( var ); + Data dat = vars.get( var._name ); if( dat != null && dat instanceof MatrixObject ) { - MatrixObject mo = (MatrixObject) vars.get( var ); + MatrixObject mo = (MatrixObject) dat; long rows = mo.getNumRows(); - long cols = mo.getNumColumns(); + long cols = mo.getNumColumns(); long nnz = mo.getNnz(); if( nnz > 0 ) //w/ compare @@ -2364,7 +2366,7 @@ public class OptimizerRuleBased extends Optimizer return W; } - protected boolean hasOnlyInMemoryResults( OptNode n, ArrayList<String> resultVars, LocalVariableMap vars, boolean inLocal ) + protected boolean hasOnlyInMemoryResults( OptNode n, ArrayList<ResultVar> resultVars, LocalVariableMap vars, boolean inLocal ) throws DMLRuntimeException { boolean ret = true; http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java index 85a45ff..ff85aa7 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java @@ -528,7 +528,6 @@ public class InstructionUtils } public static BinaryOperator parseBinaryOperator(String opcode) - throws DMLRuntimeException { if(opcode.equalsIgnoreCase("==")) return new BinaryOperator(Equals.getEqualsFnObject()); @@ -583,7 +582,7 @@ public class InstructionUtils else if ( opcode.equalsIgnoreCase("min") ) return new BinaryOperator(Builtin.getBuiltinFnObject("min")); - throw new DMLRuntimeException("Unknown binary opcode " + opcode); + throw new RuntimeException("Unknown binary opcode " + opcode); } public static TernaryOperator parseTernaryOperator(String opcode) { http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index 432adec..e06c8c1 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -630,12 +630,6 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab return 0; return denseBlock.get(r, c); } - - public double getValueSparseUnsafe(int r, int c) { - if(sparseBlock==null || sparseBlock.isEmpty(r)) - return 0; - return sparseBlock.get(r, c); - } /** * Append value is only used when values are appended at the end of each row for the sparse representation http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAccumulatorResultMergeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAccumulatorResultMergeTest.java b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAccumulatorResultMergeTest.java new file mode 100644 index 0000000..7936c26 --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAccumulatorResultMergeTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.parfor; + +import java.util.HashMap; + +import org.junit.Test; +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; +import org.apache.sysml.lops.LopProperties.ExecType; +import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex; +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; +import org.apache.sysml.test.utils.TestUtils; + +public class ParForAccumulatorResultMergeTest extends AutomatedTestBase +{ + private final static String TEST_DIR = "functions/parfor/"; + private final static String TEST_NAME1 = "parfor_accumulator1"; //local merge + private final static String TEST_NAME2 = "parfor_accumulator2"; //remote MR merge + private final static String TEST_NAME3 = "parfor_accumulator3"; //remote SPARK merge + + private final static String TEST_CLASS_DIR = TEST_DIR + ParForAccumulatorResultMergeTest.class.getSimpleName() + "/"; + + private final static double eps = 0; + private final static int rows = 1210; + private final static int cols = 345; + + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "R" }) ); + addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] { "R" }) ); + addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] { "R" }) ); + } + + @Test + public void testParForAccumulatorLocalEmptyDense() { + runParForAccumulatorResultMergeTest(TEST_NAME1, false, false, ExecType.CP); + } + + @Test + public void testParForAccumulatorLocalEmptySparse() { + runParForAccumulatorResultMergeTest(TEST_NAME1, false, true, ExecType.CP); + } + + @Test + public void testParForAccumulatorLocalInitDense() { + runParForAccumulatorResultMergeTest(TEST_NAME1, true, false, ExecType.CP); + } + + @Test + public void testParForAccumulatorLocalInitSparse() { + runParForAccumulatorResultMergeTest(TEST_NAME1, true, true, ExecType.CP); + } + + private void runParForAccumulatorResultMergeTest( String test, boolean init, boolean sparse, ExecType et ) + { + RUNTIME_PLATFORM platformOld = rtplatform; + switch( et ) { + case CP: rtplatform = RUNTIME_PLATFORM.SINGLE_NODE; break; + case MR: rtplatform = RUNTIME_PLATFORM.HYBRID; break; + case SPARK: rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK; break; + default: throw new RuntimeException("Unsupported exec type: "+et.name()); + } + boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; + if( et == ExecType.SPARK ) + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + + try { + String TEST_NAME = test; + TestConfiguration config = getTestConfiguration(TEST_NAME); + config.addVariable("rows", rows); + config.addVariable("cols", cols); + loadTestConfiguration(config); + + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME + ".dml"; + programArgs = new String[]{"-args", + String.valueOf(rows), String.valueOf(cols), String.valueOf(init).toUpperCase(), + String.valueOf(sparse).toUpperCase(), output("R") }; + + fullRScriptName = HOME + TEST_NAME + ".R"; + rCmd = "Rscript" + " " + fullRScriptName + " " + + String.valueOf(rows) + " " + String.valueOf(cols) + " " + String.valueOf(init).toUpperCase() + + " " + String.valueOf(sparse).toUpperCase() + " " + expectedDir(); + + //run tests + runTest(true, false, null, -1); + runRScript(true); + + //compare matrices + HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R"); + HashMap<CellIndex, Double> rfile = readRMatrixFromFS("R"); + TestUtils.compareMatrices(dmlfile, rfile, eps, "DML", "R"); + } + finally { + rtplatform = platformOld; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + } + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/test/scripts/functions/parfor/parfor_accumulator1.R ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/parfor/parfor_accumulator1.R b/src/test/scripts/functions/parfor/parfor_accumulator1.R new file mode 100644 index 0000000..a1c587c --- /dev/null +++ b/src/test/scripts/functions/parfor/parfor_accumulator1.R @@ -0,0 +1,39 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +args <- commandArgs(TRUE) +options(digits=22) + +library("Matrix") + +rlen = as.integer(args[1]); +clen = as.integer(args[2]); + +R = matrix(ifelse(as.logical(args[3]), 7, 0), rlen, clen); +if( as.logical(args[4]) ) { + R[,50:300] = matrix(0, rlen, 251); +} +for(i in 1:10) { + R = R + matrix(i, rlen, clen); +} + +writeMM(as(R, "CsparseMatrix"), paste(args[5], "R", sep="")); http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/test/scripts/functions/parfor/parfor_accumulator1.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/parfor/parfor_accumulator1.dml b/src/test/scripts/functions/parfor/parfor_accumulator1.dml new file mode 100644 index 0000000..26a090d --- /dev/null +++ b/src/test/scripts/functions/parfor/parfor_accumulator1.dml @@ -0,0 +1,34 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +rlen = $1; +clen = $2; +init = $3 +sparse = $4; + +R = matrix(ifelse(init, 7, 0), rlen, clen); +if( sparse ) + R[,50:300] = matrix(0, rlen, 251); + +parfor(i in 1:10, opt=CONSTRAINED, resultmerge=LOCAL_AUTOMATIC) + R += matrix(i, rlen, clen); + +write(R, $5); http://git-wip-us.apache.org/repos/asf/systemml/blob/47498514/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java ---------------------------------------------------------------------- diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java index c898a7a..8844be2 100644 --- a/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java +++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java @@ -27,6 +27,7 @@ import org.junit.runners.Suite; @RunWith(Suite.class) @Suite.SuiteClasses({ ForLoopPredicateTest.class, + ParForAccumulatorResultMergeTest.class, ParForAdversarialLiteralsTest.class, ParForBlockwiseDataPartitioningTest.class, ParForColwiseDataPartitioningTest.class,
