This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new b9be8a3  [SYSTEMDS-3342] Fix remote parfor for cleaning pipeline 
enumeration
b9be8a3 is described below

commit b9be8a3da3b3d37fc654c9a4b285209646cb6d97
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat Apr 2 01:02:12 2022 +0200

    [SYSTEMDS-3342] Fix remote parfor for cleaning pipeline enumeration
    
    This patch makes several improvements to the generality of remote parfor
    jobs especially with regard to frames and eval functional calls and its
    dynamic function compilation on demand.
    
    * Robustness for converting nested parfors (for remote spark)
    * Broadcasting of frame inputs to remote parfor workers
    * Handling of function shipping for remote parfor w/ eval
    * Eval in remote parfor with forced load of function scripts
      and pre-materialized DMLProgram as function dictionary
    * Robustness loading of function call DAGs w/ partial overlap
      with existing functions
    * Forced single node in non-local remote parfor to prevent spark
      context creation when loading and compiling functions in remote parfor
    * Handling of local mode (outdated), now only spark not yarn specific
    * Bandit builtin script with fix for proper indexing
    * Additional tests for misc issues
---
 scripts/builtin/bandit.dml                         |  6 +-
 src/main/java/org/apache/sysds/hops/Hop.java       |  2 +-
 .../apache/sysds/hops/ParameterizedBuiltinOp.java  |  5 +-
 .../org/apache/sysds/parser/StatementBlock.java    |  2 +-
 .../sysds/parser/dml/DmlSyntacticValidator.java    | 12 +++-
 .../runtime/controlprogram/ParForProgramBlock.java |  3 +-
 .../controlprogram/paramserv/SparkPSWorker.java    |  6 +-
 .../controlprogram/parfor/RemoteDPParForSpark.java |  3 +-
 .../parfor/RemoteDPParForSparkWorker.java          | 19 ++++--
 .../controlprogram/parfor/RemoteParForSpark.java   |  6 +-
 .../parfor/RemoteParForSparkWorker.java            | 18 ++++--
 .../controlprogram/parfor/RemoteParForUtils.java   | 28 +++------
 .../instructions/cp/EvalNaryCPInstruction.java     |  2 +-
 .../cp/ParamservBuiltinCPInstruction.java          |  5 +-
 .../sysds/runtime/util/ProgramConverter.java       | 22 +++----
 .../parfor/misc/ParForEvalBuiltinTest.java         | 73 ++++++++++++++++++++++
 .../scripts/functions/parfor/parfor_eval_local.dml | 29 +++++++++
 .../functions/parfor/parfor_eval_remote.dml        | 29 +++++++++
 .../functions/parfor/parfor_eval_remote2.dml       | 38 +++++++++++
 19 files changed, 245 insertions(+), 63 deletions(-)

diff --git a/scripts/builtin/bandit.dml b/scripts/builtin/bandit.dml
index 25c826c..dd5b5d8 100644
--- a/scripts/builtin/bandit.dml
+++ b/scripts/builtin/bandit.dml
@@ -485,8 +485,10 @@ return (Double accuracy, Matrix[Double] evalFunHp, 
Matrix[Double] hpForPruning,
       [trainX, trainy, testX, testy, Tr, hpForPruning, changesByOp, 
changesByPip] = executePipeline(pipeline=as.frame(pipList['ph']),
         Xtrain=trainX, Ytrain=trainy, Xtest= testX, Ytest=testy, 
metaList=metaList, hyperParameters=as.matrix(pipList['hp']), 
hpForPruning=hpForPruning,
         changesByOp=changesByOp, flagsCount=as.scalar(pipList['flags']), 
test=TRUE, verbose=FALSE)
-      cvChanges[cvk] = changesByOp
-      allChanges[i] =  changesByPip
+      #TODO double check why this is necessary
+      mincol = min(ncol(cvChanges),ncol(changesByOp))
+      cvChanges[cvk,1:mincol] = changesByOp[,1:mincol];
+      allChanges[i] = changesByPip
     }
     if(changesByPip < ref)
       print("prunning alert 2: no training the model due to minimum changes")
diff --git a/src/main/java/org/apache/sysds/hops/Hop.java 
b/src/main/java/org/apache/sysds/hops/Hop.java
index af89b4d..344bb30 100644
--- a/src/main/java/org/apache/sysds/hops/Hop.java
+++ b/src/main/java/org/apache/sysds/hops/Hop.java
@@ -1026,7 +1026,7 @@ public abstract class Hop implements ParseInfo {
        public abstract Lop constructLops();
 
        protected final ExecType optFindExecType() {
-               return 
optFindExecType(OptimizerUtils.ALLOW_TRANSITIVE_SPARK_EXEC_TYPE ? true : false);
+               return 
optFindExecType(OptimizerUtils.ALLOW_TRANSITIVE_SPARK_EXEC_TYPE);
        }
        
        protected abstract ExecType optFindExecType(boolean transitive);
diff --git a/src/main/java/org/apache/sysds/hops/ParameterizedBuiltinOp.java 
b/src/main/java/org/apache/sysds/hops/ParameterizedBuiltinOp.java
index 7f9d71e..55e6d79 100644
--- a/src/main/java/org/apache/sysds/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysds/hops/ParameterizedBuiltinOp.java
@@ -168,19 +168,17 @@ public class ParameterizedBuiltinOp extends 
MultiThreadedHop {
                for (Entry<String, Integer> cur : _paramIndexMap.entrySet())
                        inputlops.put(cur.getKey(), 
getInput().get(cur.getValue()).constructLops());
 
+               ExecType et = optFindExecType();
                switch( _op ) {
                        case GROUPEDAGG: { 
-                               ExecType et = optFindExecType();
                                constructLopsGroupedAggregate(inputlops, et);
                                break;
                        }
                        case RMEMPTY: {
-                               ExecType et = optFindExecType();
                                constructLopsRemoveEmpty(inputlops, et);
                                break;
                        } 
                        case REXPAND: {
-                               ExecType et = optFindExecType();
                                constructLopsRExpand(inputlops, et);
                                break;
                        } 
@@ -198,7 +196,6 @@ public class ParameterizedBuiltinOp extends 
MultiThreadedHop {
                        case PARAMSERV:
                        case LIST:
                        case AUTODIFF:{
-                               ExecType et = optFindExecType();
                                ParameterizedBuiltin pbilop = new 
ParameterizedBuiltin(
                                        inputlops, _op, getDataType(), 
getValueType(), et);
                                setOutputDimensions(pbilop);
diff --git a/src/main/java/org/apache/sysds/parser/StatementBlock.java 
b/src/main/java/org/apache/sysds/parser/StatementBlock.java
index 6e9545c..1604a28 100644
--- a/src/main/java/org/apache/sysds/parser/StatementBlock.java
+++ b/src/main/java/org/apache/sysds/parser/StatementBlock.java
@@ -614,7 +614,7 @@ public class StatementBlock extends LiveVariableAnalysis 
implements ParseInfo
                                {
                                        fdict = 
prog.createNamespace(DMLProgram.BUILTIN_NAMESPACE);
                                        Map<String,FunctionStatementBlock> fsbs 
= DmlSyntacticValidator
-                                               
.loadAndParseBuiltinFunction(fexpr.getName(), DMLProgram.BUILTIN_NAMESPACE);
+                                               
.loadAndParseBuiltinFunction(fexpr.getName(), DMLProgram.BUILTIN_NAMESPACE, 
false);
                                        for( 
Entry<String,FunctionStatementBlock> fsb : fsbs.entrySet() ) {
                                                if( 
!fdict.containsFunction(fsb.getKey()) )
                                                        
fdict.addFunction(fsb.getKey(), fsb.getValue());
diff --git 
a/src/main/java/org/apache/sysds/parser/dml/DmlSyntacticValidator.java 
b/src/main/java/org/apache/sysds/parser/dml/DmlSyntacticValidator.java
index 8c8a449..cbf61fc 100644
--- a/src/main/java/org/apache/sysds/parser/dml/DmlSyntacticValidator.java
+++ b/src/main/java/org/apache/sysds/parser/dml/DmlSyntacticValidator.java
@@ -638,7 +638,7 @@ public class DmlSyntacticValidator implements DmlListener {
                }
        }
 
-       public static Map<String,FunctionStatementBlock> 
loadAndParseBuiltinFunction(String name, String namespace) {
+       public static Map<String,FunctionStatementBlock> 
loadAndParseBuiltinFunction(String name, String namespace, boolean forced) {
                if( !Builtins.contains(name, true, false) ) {
                        throw new DMLRuntimeException("Function "
                                + DMLProgram.constructFunctionKey(namespace, 
name)+" is not a builtin function.");
@@ -649,10 +649,12 @@ public class DmlSyntacticValidator implements DmlListener 
{
                        new CustomErrorListener(), new HashMap<>(), namespace, 
new HashSet<>());
                String filePath = Builtins.getFilePath(name);
                FunctionDictionary<FunctionStatementBlock> dict = tmp
-                       .parseAndAddImportedFunctions(namespace, filePath, null)
+                       .parseAndAddImportedFunctions(namespace, filePath, 
null, forced)
                        .getBuiltinFunctionDictionary();
                
                //construct output map of all functions
+               if(dict == null)
+                       throw new RuntimeException("Failed function load: 
"+name+" "+namespace);
                return dict.getFunctions();
        }
 
@@ -1741,13 +1743,17 @@ public class DmlSyntacticValidator implements 
DmlListener {
        }
        
        private DMLProgram parseAndAddImportedFunctions(String namespace, 
String filePath, ParserRuleContext ctx) {
+               return parseAndAddImportedFunctions(namespace, filePath, ctx, 
false);
+       }
+       
+       private DMLProgram parseAndAddImportedFunctions(String namespace, 
String filePath, ParserRuleContext ctx, boolean forced) {
                //validate namespace w/ awareness of dml-bodied builtin 
functions
                validateNamespace(namespace, filePath, ctx);
                
                //read and parse namespace files
                String scriptID = DMLProgram.constructFunctionKey(namespace, 
filePath);
                DMLProgram prog = null;
-               if (!_f2NS.get().containsKey(scriptID)) {
+               if (forced || !_f2NS.get().containsKey(scriptID) ) {
                        _f2NS.get().put(scriptID, namespace);
                        try {
                                prog = new DMLParserWrapper().doParse(filePath,
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index dc60a79..8abf2e7 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -290,6 +290,7 @@ public class ParForProgramBlock extends ForProgramBlock
        public static final boolean USE_RANGE_TASKS_IF_USEFUL   = true; // use 
range tasks whenever size>3, false, otherwise wrong split order in remote 
        public static final boolean USE_STREAMING_TASK_CREATION = true; // 
start working while still creating tasks, prevents blocking due to too small 
task queue
        public static final boolean ALLOW_NESTED_PARALLELISM    = true; // if 
not, transparently change parfor to for on program conversions (local,remote)
+       public static final boolean CONVERT_NESTED_REMOTE_PARFOR = true; 
//convert parfor to for in remote parfor
        public static final boolean USE_PARALLEL_RESULT_MERGE   = false; // if 
result merge is run in parallel or serial 
        public static final boolean USE_PARALLEL_RESULT_MERGE_REMOTE = true; // 
if remote result merge should be run in parallel for multiple result vars
        public static final boolean CREATE_UNSCOPED_RESULTVARS  = true;
@@ -1141,7 +1142,7 @@ public class ParForProgramBlock extends ForProgramBlock
                        .map(v -> v._name).collect(Collectors.toSet());
                Set<String> brVars = inputs.keySet().stream()
                        .filter(v -> !retVars.contains(v))
-                       .filter(v -> ec.getVariable(v).getDataType().isMatrix())
+                       .filter(v -> 
ec.getVariable(v).getDataType().isMatrixOrFrame())
                        .filter(v -> 
OptimizerUtils.estimateSize(ec.getDataCharacteristics(v))< 2.14e9)
                        .collect(Collectors.toSet());
                return brVars;
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/SparkPSWorker.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/SparkPSWorker.java
index 5ae55a3..9e96b45 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/SparkPSWorker.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/SparkPSWorker.java
@@ -41,6 +41,7 @@ public class SparkPSWorker extends LocalPSWorker implements 
VoidFunction<Tuple2<
        private static final long serialVersionUID = -8674739573419648732L;
 
        private final String _program;
+       private final boolean _isLocal;
        private final HashMap<String, byte[]> _clsMap;
        private final SparkConf _conf;
        private final int _port; // rpc port
@@ -54,13 +55,14 @@ public class SparkPSWorker extends LocalPSWorker implements 
VoidFunction<Tuple2<
        private final LongAccumulator _nBatches; //number of executed batches
        private final LongAccumulator _nEpochs; //number of executed epoches
 
-       public SparkPSWorker(String updFunc, String aggFunc, 
Statement.PSFrequency freq, int epochs, long batchSize, String program, 
HashMap<String, byte[]> clsMap, SparkConf conf, int port, LongAccumulator 
aSetup, LongAccumulator aWorker, LongAccumulator aUpdate, LongAccumulator 
aIndex, LongAccumulator aGrad, LongAccumulator aRPC, LongAccumulator aBatches, 
LongAccumulator aEpochs, int nbatches, boolean modelAvg) {
+       public SparkPSWorker(String updFunc, String aggFunc, 
Statement.PSFrequency freq, int epochs, long batchSize, String program, boolean 
isLocal, HashMap<String, byte[]> clsMap, SparkConf conf, int port, 
LongAccumulator aSetup, LongAccumulator aWorker, LongAccumulator aUpdate, 
LongAccumulator aIndex, LongAccumulator aGrad, LongAccumulator aRPC, 
LongAccumulator aBatches, LongAccumulator aEpochs, int nbatches, boolean 
modelAvg) {
                _updFunc = updFunc;
                _aggFunc = aggFunc;
                _freq = freq;
                _epochs = epochs;
                _batchSize = batchSize;
                _program = program;
+               _isLocal = isLocal;
                _clsMap = clsMap;
                _conf = conf;
                _port = port;
@@ -105,7 +107,7 @@ public class SparkPSWorker extends LocalPSWorker implements 
VoidFunction<Tuple2<
                _ec = body.getEc();
 
                // Initialize the buffer pool and register it in the jvm 
shutdown hook in order to be cleanuped at the end
-               RemoteParForUtils.setupBufferPool(_workerID);
+               RemoteParForUtils.setupBufferPool(_workerID, _isLocal);
 
                // Create the ps proxy
                _ps = PSRpcFactory.createSparkPSProxy(_conf, _port, _aRPC);
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSpark.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSpark.java
index 6f01f8c..27a11dd 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSpark.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSpark.java
@@ -75,6 +75,7 @@ public class RemoteDPParForSpark
                
                SparkExecutionContext sec = (SparkExecutionContext)ec;
                JavaSparkContext sc = sec.getSparkContext();
+               boolean isLocal = sc.isLocal();
                
                //prepare input parameters
                MatrixObject mo = sec.getMatrixObject(matrixvar);
@@ -90,7 +91,7 @@ public class RemoteDPParForSpark
                int numReducers2 = Math.max(numReducers, Math.min(numParts, 
(int)dpf.getNumParts(mc)));
                
                //core parfor datapartition-execute (w/ or w/o shuffle, 
depending on data characteristics)
-               RemoteDPParForSparkWorker efun = new 
RemoteDPParForSparkWorker(program, clsMap, 
+               RemoteDPParForSparkWorker efun = new 
RemoteDPParForSparkWorker(program, isLocal, clsMap, 
                                matrixvar, itervar, enableCPCaching, mc, 
tSparseCol, dpf, fmt, aTasks, aIters);
                JavaPairRDD<Long,Writable> tmp = getPartitionedInput(sec, 
matrixvar, fmt, dpf);
                List<Tuple2<Long,String>> out = (requiresGrouping(dpf, mo) ?
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index b4886d2..3526a7c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.util.LongAccumulator;
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.codegen.CodegenUtils;
@@ -31,7 +33,6 @@ import 
org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PartitionForma
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.parfor.Task.TaskType;
-import 
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.controlprogram.parfor.util.PairWritableBlock;
 import org.apache.sysds.runtime.controlprogram.parfor.util.PairWritableCell;
 import org.apache.sysds.runtime.instructions.cp.IntObject;
@@ -51,6 +52,7 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
        private static final long serialVersionUID = 30223759283155139L;
        
        private final String  _prog;
+       private final boolean _isLocal;
        private final HashMap<String, byte[]> _clsMap;
        private final boolean _caching;
        private final String _inputVar;
@@ -66,11 +68,12 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
        private final LongAccumulator _aTasks;
        private final LongAccumulator _aIters;
        
-       public RemoteDPParForSparkWorker(String program, HashMap<String, 
byte[]> clsMap, String inputVar, String iterVar,
-               boolean cpCaching, DataCharacteristics mc, boolean tSparseCol, 
PartitionFormat dpf, FileFormat fmt,
-               LongAccumulator atasks, LongAccumulator aiters)
+       public RemoteDPParForSparkWorker(String program, boolean isLocal, 
HashMap<String, byte[]> clsMap,
+               String inputVar, String iterVar, boolean cpCaching, 
DataCharacteristics mc, boolean tSparseCol,
+               PartitionFormat dpf, FileFormat fmt, LongAccumulator atasks, 
LongAccumulator aiters)
        {
                _prog = program;
+               _isLocal = isLocal;
                _clsMap = clsMap;
                _caching = cpCaching;
                _inputVar = inputVar;
@@ -148,14 +151,18 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
                _numIters    = 0;
 
                //setup the buffer pool
-               RemoteParForUtils.setupBufferPool(_workerID);
+               RemoteParForUtils.setupBufferPool(_workerID, _isLocal);
 
                //ensure that resultvar files are not removed
                super.pinResultVariables();
                
                //enable/disable caching (if required and not in CP process)
-               if( !_caching && !InfrastructureAnalyzer.isLocalMode() )
+               if( !_caching && !_isLocal )
                        CacheableData.disableCaching();
+               
+               //ensure local mode for eval function loading on demand
+               if( !_isLocal )
+                       DMLScript.setGlobalExecMode(ExecMode.SINGLE_NODE);
        }
        
        /**
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSpark.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSpark.java
index 87c1a8a..3057cb5 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSpark.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSpark.java
@@ -38,7 +38,6 @@ import 
org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
-import 
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
@@ -74,6 +73,7 @@ public class RemoteParForSpark
                
                SparkExecutionContext sec = (SparkExecutionContext)ec;
                JavaSparkContext sc = sec.getSparkContext();
+               boolean isLocal = sc.isLocal();
                
                //initialize accumulators for tasks/iterations
                LongAccumulator aTasks = sc.sc().longAccumulator("tasks");
@@ -81,7 +81,7 @@ public class RemoteParForSpark
                
                //reset cached shared inputs for correctness in local mode
                long jobid = _jobID.getNextID();
-               if( InfrastructureAnalyzer.isLocalMode() )
+               if( isLocal )
                        RemoteParForSparkWorker.cleanupCachedVariables(jobid);
 
                // broadcast the inputs except the result variables
@@ -95,7 +95,7 @@ public class RemoteParForSpark
                //run remote_spark parfor job 
                //(w/o lazy evaluation to fit existing parfor framework, e.g., 
result merge)
                List<Tuple2<Long, String>> out = sc.parallelize(tasks, 
tasks.size()) //create rdd of parfor tasks
-                       .flatMapToPair(new RemoteParForSparkWorker(jobid, prog,
+                       .flatMapToPair(new RemoteParForSparkWorker(jobid, prog, 
isLocal,
                                clsMap, cpCaching, aTasks, aIters, brInputs, 
topLevelPF, serialLineage))
                        .collect(); //execute and get output handles
                
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index b002f04..326d2e4 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -34,10 +34,10 @@ import 
org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.util.LongAccumulator;
 import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.runtime.codegen.CodegenUtils;
 import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
-import 
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.lineage.Lineage;
 import org.apache.sysds.runtime.util.CollectionUtils;
 import org.apache.sysds.runtime.util.ProgramConverter;
@@ -52,6 +52,7 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
        
        private final long _jobid;
        private final String _prog;
+       private final boolean _isLocal;
        private final HashMap<String, byte[]> _clsMap;
        private boolean _initialized = false;
        private boolean _caching = true;
@@ -63,12 +64,13 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
 
        private final Map<String, Broadcast<CacheBlock>> _brInputs;
        
-       public RemoteParForSparkWorker(long jobid, String program, 
HashMap<String, byte[]> clsMap, boolean cpCaching,
-                       LongAccumulator atasks, LongAccumulator aiters, 
Map<String, Broadcast<CacheBlock>> brInputs, 
-                       boolean cleanCache, Map<String,String> lineage) 
+       public RemoteParForSparkWorker(long jobid, String program, boolean 
isLocal,
+               HashMap<String, byte[]> clsMap, boolean cpCaching, 
LongAccumulator atasks, LongAccumulator aiters,
+               Map<String, Broadcast<CacheBlock>> brInputs, boolean 
cleanCache, Map<String,String> lineage) 
        {
                _jobid = jobid;
                _prog = program;
+               _isLocal = isLocal;
                _clsMap = clsMap;
                _initialized = false;
                _caching = cpCaching;
@@ -139,15 +141,19 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
                reuseVars.reuseVariables(_jobid, _ec.getVariables(), 
excludeList, _brInputs, _cleanCache);
                
                //setup the buffer pool
-               RemoteParForUtils.setupBufferPool(_workerID);
+               RemoteParForUtils.setupBufferPool(_workerID, _isLocal);
 
                //ensure that resultvar files are not removed
                super.pinResultVariables();
                
                //enable/disable caching (if required and not in CP process)
-               if( !_caching && !InfrastructureAnalyzer.isLocalMode() )
+               if( !_caching && !_isLocal )
                        CacheableData.disableCaching();
                
+               //ensure local mode for eval function loading on demand
+               if( !_isLocal )
+                       DMLScript.setGlobalExecMode(ExecMode.SINGLE_NODE);
+
                //enable and setup lineage
                if( _lineage != null ) {
                        DMLScript.LINEAGE = true;
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java
index b7cce58..33b8284 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java
@@ -216,23 +216,13 @@ public class RemoteParForUtils
        /**
         * Cleanup all temporary files created by this SystemDS process.
         */
-       public static void cleanupWorkingDirectories()
-       {
-               //use the given job configuration for infrastructure analysis 
(see configure);
-               //this is important for robustness w/ misconfigured classpath 
which also contains
-               //core-default.xml and hence hides the actual cluster 
configuration; otherwise
-               //there is missing cleanup of working directories 
-               JobConf job = ConfigurationManager.getCachedJobConf();
-               
-               if( !InfrastructureAnalyzer.isLocalMode(job) )
-               {
-                       //delete cache files
-                       CacheableData.cleanupCacheDir();
-                       //disable caching (prevent dynamic eviction)
-                       CacheableData.disableCaching();
-                       //cleanup working dir (e.g., of CP_FILE instructions)
-                       LocalFileUtils.cleanupWorkingDirectory();
-               }
+       public static void cleanupWorkingDirectories() {
+               //delete cache files
+               CacheableData.cleanupCacheDir();
+               //disable caching (prevent dynamic eviction)
+               CacheableData.disableCaching();
+               //cleanup working dir (e.g., of CP_FILE instructions)
+               LocalFileUtils.cleanupWorkingDirectory();
        }
 
        /**
@@ -294,13 +284,13 @@ public class RemoteParForUtils
         * @param workerID worker id
         * @throws IOException exception
         */
-       public static void setupBufferPool(long workerID) throws IOException {
+       public static void setupBufferPool(long workerID, boolean isLocal) 
throws IOException {
                //init and register-cleanup of buffer pool (in spark, multiple 
tasks might
                //share the process-local, i.e., per executor, buffer pool; 
hence we synchronize
                //the initialization and immediately register the created 
directory for cleanup
                //on process exit, i.e., executor exit, including any files 
created in the future.
                synchronized(CacheableData.class) {
-                       if (!CacheableData.isCachingActive() && 
!InfrastructureAnalyzer.isLocalMode()) {
+                       if (!CacheableData.isCachingActive() && !isLocal) {
                                //create id, executor working dir, and cache dir
                                String uuid = 
IDHandler.createDistributedUniqueID();
                                
LocalFileUtils.createWorkingDirectoryWithUUID(uuid);
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
index 82a13bb..46fbeeb 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
@@ -210,7 +210,7 @@ public class EvalNaryCPInstruction extends 
BuiltinNaryCPInstruction {
                //load builtin file and parse function statement block
                String nsName = DMLProgram.BUILTIN_NAMESPACE;
                Map<String,FunctionStatementBlock> fsbs = DmlSyntacticValidator
-                       .loadAndParseBuiltinFunction(name, nsName);
+                       .loadAndParseBuiltinFunction(name, nsName, true); 
//forced for remote parfor
                if( fsbs.isEmpty() )
                        throw new DMLRuntimeException("Failed to compile 
function '"+name+"'.");
                
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
index 67fa582..25353f6 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
@@ -246,7 +246,8 @@ public class ParamservBuiltinCPInstruction extends 
ParameterizedBuiltinCPInstruc
 
                // Get driver host
                String host = 
sec.getSparkContext().getConf().get("spark.driver.host");
-
+               boolean isLocal = sec.getSparkContext().isLocal();
+               
                // Create the netty server for ps
                TransportServer server = 
PSRpcFactory.createServer(sec.getSparkContext().getConf(),(LocalParamServer) 
ps, host); // Start the server
 
@@ -271,7 +272,7 @@ public class ParamservBuiltinCPInstruction extends 
ParameterizedBuiltinCPInstruc
 
                // Create remote workers
                SparkPSWorker worker = new 
SparkPSWorker(getParam(PS_UPDATE_FUN), getParam(PS_AGGREGATION_FUN),
-                       getFrequency(), getEpochs(), getBatchSize(), program, 
clsMap, sec.getSparkContext().getConf(),
+                       getFrequency(), getEpochs(), getBatchSize(), program, 
isLocal, clsMap, sec.getSparkContext().getConf(),
                        server.getPort(), aSetup, aWorker, aUpdate, aIndex, 
aGrad, aRPC, aBatch, aEpoch, nbatches, modelAvg);
 
                if (DMLScript.STATISTICS)
diff --git a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java 
b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
index c892ffb..f4a9b0e 100644
--- a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
+++ b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
@@ -77,7 +77,6 @@ import org.apache.sysds.runtime.instructions.Instruction;
 import org.apache.sysds.runtime.instructions.InstructionParser;
 import org.apache.sysds.runtime.instructions.cp.BooleanObject;
 import org.apache.sysds.runtime.instructions.cp.CPInstruction;
-import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.DoubleObject;
 import org.apache.sysds.runtime.instructions.cp.EvalNaryCPInstruction;
@@ -894,13 +893,11 @@ public class ProgramConverter
                                                }
                                        }
                                        else if(inst instanceof 
EvalNaryCPInstruction) {
-                                               CPOperand fname = 
((EvalNaryCPInstruction)inst).getInputs()[0];
-                                               if( fname.isLiteral() )
-                                                       
cand.add(DMLProgram.constructFunctionKey(DMLProgram.DEFAULT_NAMESPACE, 
fname.getName()));
-                                               else //add all potential 
targets, other than builtin functions
-                                                       
pb.getProgram().getFunctionProgramBlocks().keySet().stream()
-                                                               .filter(s -> 
!s.startsWith(DMLProgram.BUILTIN_NAMESPACE))
-                                                               .forEach(s -> 
cand.add(s));
+                                               //add all potential targets, 
included loaded builtin functions because other 
+                                               //functions might call them 
directly (not through eval and thus cannot be loaded)
+                                               //(even if fname is a known 
literal, the target function might call other functions)
+                                               
pb.getProgram().getFunctionProgramBlocks().keySet().stream()
+                                                       .forEach(s -> 
cand.add(s));
                                        }
                                }
                        }
@@ -1160,10 +1157,13 @@ public class ProgramConverter
        private static String rSerializeProgramBlock( ProgramBlock pb, 
HashMap<String, byte[]> clsMap ) {
                StringBuilder sb = new StringBuilder();
                
+               boolean pbFOR = pb instanceof ForProgramBlock 
+                       && (!(pb instanceof ParForProgramBlock) || 
ParForProgramBlock.CONVERT_NESTED_REMOTE_PARFOR);
+               
                //handle header
                if( pb instanceof WhileProgramBlock ) 
                        sb.append(PB_WHILE);
-               else if ( pb instanceof ForProgramBlock && !(pb instanceof 
ParForProgramBlock) )
+               else if ( pbFOR )
                        sb.append(PB_FOR);
                else if ( pb instanceof ParForProgramBlock )
                        sb.append(PB_PARFOR);
@@ -1185,7 +1185,7 @@ public class ProgramConverter
                        sb.append( rSerializeProgramBlocks( 
wpb.getChildBlocks(), clsMap) );
                        sb.append(PBS_END);
                }
-               else if ( pb instanceof ForProgramBlock && !(pb instanceof 
ParForProgramBlock ) ) {
+               else if ( pbFOR ) { // might catch parfor too
                        ForProgramBlock fpb = (ForProgramBlock) pb; 
                        sb.append( fpb.getIterVar() );
                        sb.append( COMPONENTS_DELIM );
@@ -1374,7 +1374,7 @@ public class ProgramConverter
 
        public static Program parseProgram( String in, int id ) {
                String lin = in.substring( PROG_BEGIN.length(),in.length()- 
PROG_END.length()).trim();
-               Program prog = new Program();
+               Program prog = new Program(new DMLProgram());
                parseFunctionProgramBlocks(lin, prog, id);
                return prog;
        }
diff --git 
a/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForEvalBuiltinTest.java
 
b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForEvalBuiltinTest.java
new file mode 100644
index 0000000..bfe995b
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForEvalBuiltinTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.parfor.misc;
+
+import org.junit.Test;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+
+public class ParForEvalBuiltinTest extends AutomatedTestBase 
+{
+       private final static String TEST_NAME1 = "parfor_eval_local";
+       private final static String TEST_NAME2 = "parfor_eval_remote";
+       private final static String TEST_NAME3 = "parfor_eval_remote2";
+       private final static String TEST_DIR = "functions/parfor/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
ParForEvalBuiltinTest.class.getSimpleName() + "/";
+       
+       private final static int rows = 20;
+       private final static int cols = 10;
+       
+       @Override
+       public void setUp() {
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"Rout"}) );
+               addTestConfiguration(TEST_NAME2, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"Rout"}) );
+               addTestConfiguration(TEST_NAME3, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {"Rout"}) );
+       }
+
+       @Test
+       public void testParForEvalLocal() {
+               runFunctionTest(TEST_NAME1);
+       }
+       
+       @Test
+       public void testParForEvalRemote() {
+               runFunctionTest(TEST_NAME2);
+       }
+       
+       @Test
+       public void testParForEvalRemote2() {
+               runFunctionTest(TEST_NAME3);
+       }
+       
+       private void runFunctionTest( String testName ) {
+               TestConfiguration config = getTestConfiguration(testName);
+               config.addVariable("rows", rows);
+               config.addVariable("cols", cols);
+               loadTestConfiguration(config);
+               
+               String HOME = SCRIPT_DIR + TEST_DIR;
+               fullDMLScriptName = HOME + testName + ".dml";
+               programArgs = new String[]{"-args", 
+                       Integer.toString(rows), Integer.toString(cols)};
+
+               //run without errors on function loading
+               runTest(true, false, null, -1);
+       }
+}
diff --git a/src/test/scripts/functions/parfor/parfor_eval_local.dml 
b/src/test/scripts/functions/parfor/parfor_eval_local.dml
new file mode 100644
index 0000000..bda33a7
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_eval_local.dml
@@ -0,0 +1,29 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = rand(rows=$1, cols=$2)
+parfor(i in 1:10, opt=CONSTRAINED, mode=LOCAL) {
+  s = "outlierBySd";
+  if( i>5 )
+    s = "outlierByIQR";
+  print(toString(sum(eval(s,
+    list(X=X, k=1.5, repairMethod=1, max_iterations=0, verbose=FALSE)))));
+}
diff --git a/src/test/scripts/functions/parfor/parfor_eval_remote.dml 
b/src/test/scripts/functions/parfor/parfor_eval_remote.dml
new file mode 100644
index 0000000..c6bf05e
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_eval_remote.dml
@@ -0,0 +1,29 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = rand(rows=$1, cols=$2)
+parfor(i in 1:10, opt=CONSTRAINED, mode=REMOTE_SPARK) {
+  s = "outlierBySd";
+  if( i>5 )
+    s = "outlierByIQR";
+  print(toString(sum(eval(s,
+    list(X=X, k=1.5, repairMethod=1, max_iterations=0, verbose=FALSE)))));
+}
diff --git a/src/test/scripts/functions/parfor/parfor_eval_remote2.dml 
b/src/test/scripts/functions/parfor/parfor_eval_remote2.dml
new file mode 100644
index 0000000..cb0c60f
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_eval_remote2.dml
@@ -0,0 +1,38 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = rand(rows=$1, cols=$2)
+
+parfor(i in 1:10, opt=CONSTRAINED, mode=REMOTE_SPARK) {
+  s = "outlierBySd";
+  if( i>5 )
+    s = "outlierByIQR";
+  print(toString(sum(eval(s,
+    list(X=X, k=1.5, repairMethod=1, max_iterations=0, verbose=FALSE)))));
+}
+
+parfor(i2 in 1:10, opt=CONSTRAINED, mode=REMOTE_SPARK) {
+  s2 = "outlierBySd";
+  if( i2>5 )
+    s2 = "outlierByIQR";
+  print(toString(sum(eval(s2,
+    list(X=X, k=1.5, repairMethod=1, max_iterations=0, verbose=FALSE)))));
+}

Reply via email to