[SYSTEMML-1360][SYSTEMML-1290] Codegen instructions in parfor spark jobs

Our code generator compiles and loads generated operators at the driver.
Without shipping these classes to the executors, parfor spark jobs with
spoof instructions in the body fail due to missing classes. This patch
now explicitly ships the byte representation of necessary classes
(referenced from instructions in the parfor body) to parfor workers. 

Furthermore, this patch also hardens the transfer of generated classes
for distributed spoof spark instructions by shipping the actual byte
code and modifying the compilation chain to avoid deleting generated
class files (which created a brittle dependence on the class cache).


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/d62b2fce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/d62b2fce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/d62b2fce

Branch: refs/heads/master
Commit: d62b2fced61e6000081e93ca0298a9a16b86d437
Parents: 9b69f36
Author: Matthias Boehm <[email protected]>
Authored: Thu Mar 2 14:26:38 2017 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Thu Mar 2 14:26:38 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/sysml/api/DMLScript.java    |   7 +-
 .../sysml/runtime/codegen/CodegenUtils.java     |  74 ++------
 .../controlprogram/ParForProgramBlock.java      |  14 +-
 .../controlprogram/parfor/ProgramConverter.java | 187 ++++++++-----------
 .../parfor/RemoteDPParForSpark.java             |   9 +-
 .../parfor/RemoteDPParForSparkWorker.java       |  17 +-
 .../parfor/RemoteParForSpark.java               |   7 +-
 .../parfor/RemoteParForSparkWorker.java         |  13 +-
 .../instructions/cp/SpoofCPInstruction.java     |   7 +-
 .../instructions/spark/SpoofSPInstruction.java  |   2 +-
 10 files changed, 153 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/api/DMLScript.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/DMLScript.java 
b/src/main/java/org/apache/sysml/api/DMLScript.java
index e0eff6c..25c03ea 100644
--- a/src/main/java/org/apache/sysml/api/DMLScript.java
+++ b/src/main/java/org/apache/sysml/api/DMLScript.java
@@ -581,7 +581,7 @@ public class DMLScript
                AParserWrapper parser = AParserWrapper.createParser(parsePyDML);
                DMLProgram prog = parser.parse(DML_FILE_PATH_ANTLR_PARSER, 
dmlScriptStr, argVals);
                
-               //Step 4: construct HOP DAGs (incl LVA and validate)
+               //Step 4: construct HOP DAGs (incl LVA, validate, and setup)
                DMLTranslator dmlt = new DMLTranslator(prog);
                dmlt.liveVariableAnalysis(prog);                        
                dmlt.validateParseTree(prog);
@@ -592,6 +592,9 @@ public class DMLScript
                        dmlt.printHops(prog);
                        DMLTranslator.resetHopsDAGVisitStatus(prog);
                }
+               
+               //init working directories (before usage by following 
compilation steps)
+               initHadoopExecution( dmlconf );
        
                //Step 5: rewrite HOP DAGs (incl IPA and memory estimates)
                dmlt.rewriteHopsDAG(prog);
@@ -673,8 +676,6 @@ public class DMLScript
                ExecutionContext ec = null;
                try 
                {  
-                       initHadoopExecution( dmlconf );
-                       
                        //run execute (w/ exception handling to ensure proper 
shutdown)
                        ec = ExecutionContextFactory.createContext(rtprog);
                        rtprog.execute( ec );  

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java 
b/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java
index 54b421c..f5470de 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java
@@ -19,11 +19,9 @@
 
 package org.apache.sysml.runtime.codegen;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.ObjectOutputStream;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.Arrays;
@@ -48,9 +46,7 @@ import org.apache.sysml.utils.Statistics;
 
 public class CodegenUtils 
 {
-       //cache to reuse compiled and loaded classes (this is also a workaround 
for classes,
-       //compiled during initial compilation and subsequently loaded as the 
working directory
-       //is cleaned up just before the actual execution
+       //cache to reuse compiled and loaded classes 
        private static ConcurrentHashMap<String, Class<?>> _cache = new 
ConcurrentHashMap<String,Class<?>>();
        private static String _workingDir = null;
        
@@ -128,7 +124,13 @@ public class CodegenUtils
                return ret;
        }
        
-       public static Class<?> loadClass(String name, byte[] classBytes) throws 
DMLRuntimeException {
+       public static Class<?> loadClass(String name) throws 
DMLRuntimeException {
+               return loadClass(name, null);
+       }
+       
+       public static Class<?> loadClass(String name, byte[] classBytes) 
+               throws DMLRuntimeException 
+       {
                //reuse existing compiled class
                Class<?> ret = _cache.get(name);
                if( ret != null ) 
@@ -188,16 +190,10 @@ public class CodegenUtils
        public static byte[] getClassAsByteArray(String name) 
                throws DMLRuntimeException
        {
-               //reuse existing compiled class
-               Class<?> cls = _cache.get(name);
-               if( cls != null ) 
-                       return getClassAsByteArray(cls);
-               
-               
                String classAsPath = name.replace('.', '/') + ".class";
                
                URLClassLoader classLoader = null;
-               byte[] ret = null;
+               InputStream stream = null;
                
                try {
                        //dynamically load compiled class
@@ -205,56 +201,18 @@ public class CodegenUtils
                        classLoader = new URLClassLoader(
                                        new URL[]{new 
File(_workingDir).toURI().toURL(), runDir}, 
                                        CodegenUtils.class.getClassLoader());
-                       InputStream stream = 
classLoader.getResourceAsStream(classAsPath);
-                       ret = IOUtils.toByteArray(stream);
+                       stream = classLoader.getResourceAsStream(classAsPath);
+                       return IOUtils.toByteArray(stream);
                } 
                catch (IOException e) {
                        throw new DMLRuntimeException(e);
                }
                finally {
                        IOUtilFunctions.closeSilently(classLoader);
+                       IOUtilFunctions.closeSilently(stream);
                }
-               
-               return ret;
        }
 
-
-       public static byte[] getClassAsByteArray(Class<?> cls) 
-               throws DMLRuntimeException 
-       {
-               ByteArrayOutputStream bos = new ByteArrayOutputStream();
-               try {
-                       ObjectOutputStream oos = new ObjectOutputStream(bos);
-                       oos.writeObject(cls);
-                       oos.flush();
-                       return bos.toByteArray();
-               } 
-               catch( IOException e ) {
-                       throw new DMLRuntimeException(e);
-               } 
-               finally {
-                       IOUtilFunctions.closeSilently(bos);
-               }
-       }
-       
-       private static void createWorkingDir() throws DMLRuntimeException  {
-               if( _workingDir != null )
-                       return;
-               String tmp = 
LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_CODEGEN);
-               LocalFileUtils.createLocalFileIfNotExist(tmp);
-               _workingDir = tmp;
-       }
-       
-       public static URL[] getUrls() throws DMLRuntimeException {
-               try {
-                       URL runDir = 
CodegenUtils.class.getProtectionDomain().getCodeSource().getLocation(); 
-                       return new URL[]{new File(_workingDir).toURI().toURL(), 
runDir};
-               }
-               catch(Exception e) {
-                       throw new DMLRuntimeException(e);
-               }                               
-       }
-       
        public static String getSpoofType(Class<?> cls) {
                if(cls.getSuperclass() == SpoofCellwise.class)
                        return "Cell" +  cls.getName().split("\\.")[1];
@@ -269,4 +227,12 @@ public class CodegenUtils
        public static void clearClassCache() {
                _cache.clear();
        }
+       
+       private static void createWorkingDir() throws DMLRuntimeException  {
+               if( _workingDir != null )
+                       return;
+               String tmp = 
LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_CODEGEN);
+               LocalFileUtils.createLocalFileIfNotExist(tmp);
+               _workingDir = tmp;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index 4cdfa7b..1023a80 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -986,8 +986,9 @@ public class ParForProgramBlock extends ForProgramBlock
                // Step 1) init parallel workers (serialize PBs)
                // NOTES: each mapper changes filenames with regard to his ID 
as we submit a single job,
                //        cannot reuse serialized string, since variables are 
serialized as well.
-               ParForBody body = new ParForBody( _childBlocks, _resultVars, ec 
);
-               String program = ProgramConverter.serializeParForBody( body );
+               ParForBody body = new ParForBody(_childBlocks, _resultVars, ec);
+               HashMap<String, byte[]> clsMap = new HashMap<String, byte[]>();
+               String program = ProgramConverter.serializeParForBody(body, 
clsMap);
                
                if( _monitor ) 
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_INIT_PARWRK_T, time.stop());
@@ -1008,7 +1009,7 @@ public class ParForProgramBlock extends ForProgramBlock
                                
                // Step 3) submit Spark parfor job (no lazy evaluation, since 
collect on result)
                //MatrixObject colocatedDPMatrixObj = 
(_colocatedDPMatrix!=null)? (MatrixObject)ec.getVariable(_colocatedDPMatrix) : 
null;
-               RemoteParForJobReturn ret = RemoteParForSpark.runJob(_ID, 
program, tasks, ec, _enableCPCaching, _numThreads);
+               RemoteParForJobReturn ret = RemoteParForSpark.runJob(_ID, 
program, clsMap, tasks, ec, _enableCPCaching, _numThreads);
                
                if( _monitor ) 
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_WAIT_EXEC_T, time.stop());
@@ -1050,7 +1051,8 @@ public class ParForProgramBlock extends ForProgramBlock
                // NOTES: each mapper changes filenames with regard to his ID 
as we submit a single job,
                //        cannot reuse serialized string, since variables are 
serialized as well.
                ParForBody body = new ParForBody( _childBlocks, _resultVars, ec 
);
-               String program = ProgramConverter.serializeParForBody( body );
+               HashMap<String, byte[]> clsMap = new HashMap<String, byte[]>(); 
+               String program = ProgramConverter.serializeParForBody( body, 
clsMap );
                
                if( _monitor ) 
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_INIT_PARWRK_T, time.stop());
@@ -1071,8 +1073,8 @@ public class ParForProgramBlock extends ForProgramBlock
                OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && 
inputDPF==PDataPartitionFormat.COLUMN_WISE)||
                                              (inputMatrix.getSparsity()<0.001 
&& inputDPF==PDataPartitionFormat.ROW_WISE))? 
                                             OutputInfo.BinaryCellOutputInfo : 
OutputInfo.BinaryBlockOutputInfo;
-               RemoteParForJobReturn ret = RemoteDPParForSpark.runJob(_ID, 
itervar.getName(), _colocatedDPMatrix, program, resultFile, 
-                               inputMatrix, ec, inputDPF, inputOI, 
_tSparseCol, _enableCPCaching, _numThreads );
+               RemoteParForJobReturn ret = RemoteDPParForSpark.runJob(_ID, 
itervar.getName(), _colocatedDPMatrix, program, clsMap, 
+                               resultFile, inputMatrix, ec, inputDPF, inputOI, 
_tSparseCol, _enableCPCaching, _numThreads );
                
                if( _monitor ) 
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_WAIT_EXEC_T, time.stop());

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
index 5cc9ca1..00e72c4 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
@@ -47,6 +47,7 @@ import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.parser.WhileStatementBlock;
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.codegen.CodegenUtils;
 import org.apache.sysml.runtime.controlprogram.ExternalFunctionProgramBlock;
 import org.apache.sysml.runtime.controlprogram.ExternalFunctionProgramBlockCP;
 import org.apache.sysml.runtime.controlprogram.ForProgramBlock;
@@ -75,6 +76,7 @@ import org.apache.sysml.runtime.instructions.cp.DoubleObject;
 import org.apache.sysml.runtime.instructions.cp.FunctionCallCPInstruction;
 import org.apache.sysml.runtime.instructions.cp.IntObject;
 import org.apache.sysml.runtime.instructions.cp.ScalarObject;
+import org.apache.sysml.runtime.instructions.cp.SpoofCPInstruction;
 import org.apache.sysml.runtime.instructions.cp.StringObject;
 import org.apache.sysml.runtime.instructions.cp.VariableCPInstruction;
 import org.apache.sysml.runtime.instructions.gpu.GPUInstruction;
@@ -88,17 +90,14 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.udf.ExternalFunctionInvocationInstruction;
 
 /**
- * Static functionalities for 
- *   * creating deep copies of program blocks, instructions, function program 
blocks
- *   * serializing and parsing of programs, program blocks, functions program 
blocks
+ * Program converter functionalities for 
+ *   (1) creating deep copies of program blocks, instructions, function 
program blocks, and 
+ *   (2) serializing and parsing of programs, program blocks, functions 
program blocks.
  * 
- * TODO: CV, EL, ELUse program blocks not considered so far (not for BI 2.0 
release)
- * TODO: rewrite class to instance-based invocation (grown gradually and now 
inappropriate design)
- *
  */
+//TODO: rewrite class to instance-based invocation (grown gradually and now 
inappropriate design)
 public class ProgramConverter 
 {
-       
        protected static final Log LOG = 
LogFactory.getLog(ProgramConverter.class.getName());
     
        //use escaped unicodes for separators in order to prevent string 
conflict
@@ -721,7 +720,11 @@ public class ProgramConverter
        // SERIALIZATION 
        ////////////////////////////////        
 
-       public static String serializeParForBody( ParForBody body ) 
+       public static String serializeParForBody( ParForBody body ) throws 
DMLRuntimeException {
+               return serializeParForBody(body, new HashMap<String, byte[]>());
+       }       
+       
+       public static String serializeParForBody( ParForBody body, 
HashMap<String,byte[]> clsMap ) 
                throws DMLRuntimeException
        {
                ArrayList<ProgramBlock> pbs = body.getChildBlocks();
@@ -755,14 +758,14 @@ public class ProgramConverter
                //handle program
                sb.append( PARFOR_PROG_BEGIN );
                sb.append( NEWLINE );
-               sb.append( serializeProgram(prog, pbs) );
+               sb.append( serializeProgram(prog, pbs, clsMap) );
                sb.append( PARFOR_PROG_END );
                sb.append( NEWLINE );
                sb.append( COMPONENTS_DELIM );
                sb.append( NEWLINE );
                
                //handle result variable names
-               sb.append( serializeStringArrayList( rVnames ) );
+               sb.append( serializeStringArrayList(rVnames) );
                sb.append( COMPONENTS_DELIM );
                
                //handle execution context
@@ -778,7 +781,7 @@ public class ProgramConverter
                //handle program blocks -- ONLY instructions, not variables.
                sb.append( PARFOR_PBS_BEGIN );
                sb.append( NEWLINE );
-               sb.append( rSerializeProgramBlocks(pbs) );
+               sb.append( rSerializeProgramBlocks(pbs, clsMap) );
                sb.append( PARFOR_PBS_END );
                sb.append( NEWLINE );
                
@@ -787,7 +790,7 @@ public class ProgramConverter
                return sb.toString();           
        }
 
-       public static String serializeProgram( Program prog, 
ArrayList<ProgramBlock> pbs ) 
+       private static String serializeProgram( Program prog, 
ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap ) 
                throws DMLRuntimeException
        {
                //note program contains variables, programblocks and function 
program blocks 
@@ -797,10 +800,10 @@ public class ProgramConverter
                HashSet<String> cand = new HashSet<String>();
                rFindSerializationCandidates(pbs, cand);
                
-               return rSerializeFunctionProgramBlocks( fpb, cand );
+               return rSerializeFunctionProgramBlocks( fpb, cand, clsMap );
        }
 
-       public static void rFindSerializationCandidates( 
ArrayList<ProgramBlock> pbs, HashSet<String> cand ) 
+       private static void rFindSerializationCandidates( 
ArrayList<ProgramBlock> pbs, HashSet<String> cand ) 
                throws DMLRuntimeException
        {
                for( ProgramBlock pb : pbs )
@@ -842,7 +845,7 @@ public class ProgramConverter
                }
        }
 
-       public static String serializeVariables (LocalVariableMap vars) 
+       private static String serializeVariables (LocalVariableMap vars) 
                throws DMLRuntimeException
        {
                StringBuilder sb = new StringBuilder();
@@ -914,26 +917,12 @@ public class ProgramConverter
                return sb.toString();
        }
 
-       public static String serializeExecutionContext( ExecutionContext ec ) 
-               throws DMLRuntimeException
-       {
-               String ret = null;
-               
-               if( ec != null )
-               {
-                       LocalVariableMap vars = ec.getVariables();
-                       ret = serializeVariables( vars );       
-               }
-               else
-               {
-                       ret = EMPTY;
-               }
-               
-               return ret;
+       private static String serializeExecutionContext( ExecutionContext ec ) 
throws DMLRuntimeException {
+               return (ec != null) ? serializeVariables( ec.getVariables() ) : 
EMPTY;
        }
 
        @SuppressWarnings("all")
-       public static String serializeInstructions( ArrayList<Instruction> inst 
) 
+       private static String serializeInstructions( ArrayList<Instruction> 
inst, HashMap<String, byte[]> clsMap ) 
                throws DMLRuntimeException
        {       
                StringBuilder sb = new StringBuilder();
@@ -941,11 +930,13 @@ public class ProgramConverter
                for( Instruction linst : inst )
                {
                        //check that only cp instruction are transmitted 
-                       if( !(   linst instanceof CPInstruction
-                                 || linst instanceof SPInstruction
-                                 || linst instanceof 
ExternalFunctionInvocationInstruction ) )
-                       {
+                       if( !( linst instanceof CPInstruction || linst 
instanceof ExternalFunctionInvocationInstruction ) )
                                throw new DMLRuntimeException( 
NOT_SUPPORTED_MR_INSTRUCTION + " " +linst.getClass().getName()+"\n"+linst );
+                       
+                       //obtain serialized version of generated classes
+                       if( linst instanceof SpoofCPInstruction ) {
+                               Class<?> cla = ((SpoofCPInstruction) 
linst).getOperatorClass();
+                               clsMap.put(cla.getName(), 
CodegenUtils.getClassAsByteArray(cla.getName()));
                        }
                        
                        if( count > 0 )
@@ -967,7 +958,7 @@ public class ProgramConverter
         * @param instStr instruction string
         * @return instruction string with replacements
         */
-       public static String checkAndReplaceLiterals( String instStr )
+       private static String checkAndReplaceLiterals( String instStr )
        {
                String tmp = instStr;
                
@@ -1004,7 +995,7 @@ public class ProgramConverter
                return tmp;
        }
 
-       public static String serializeStringHashMap( HashMap<String,String> 
vars)
+       private static String serializeStringHashMap( HashMap<String,String> 
vars)
        {
                StringBuilder sb = new StringBuilder();
                int count=0;
@@ -1048,7 +1039,7 @@ public class ProgramConverter
                return sb.toString();
        }
 
-       public static String serializeStringArray( String[] vars)
+       private static String serializeStringArray( String[] vars)
        {
                StringBuilder sb = new StringBuilder();
                int count=0;
@@ -1066,7 +1057,7 @@ public class ProgramConverter
                return sb.toString();
        }
 
-       public static String serializeDataIdentifiers( 
ArrayList<DataIdentifier> var)
+       private static String serializeDataIdentifiers( 
ArrayList<DataIdentifier> var)
        {
                StringBuilder sb = new StringBuilder();
                int count=0;
@@ -1080,10 +1071,8 @@ public class ProgramConverter
                return sb.toString();
        }
 
-       public static String serializeDataIdentifier( DataIdentifier dat )
-       {
+       private static String serializeDataIdentifier( DataIdentifier dat ) {
                // SCHEMA: <name>|<datatype>|<valuetype>
-               
                StringBuilder sb = new StringBuilder();
                sb.append(dat.getName());
                sb.append(DATA_FIELD_DELIM);
@@ -1094,7 +1083,7 @@ public class ProgramConverter
                return sb.toString();
        }
 
-       public static String 
rSerializeFunctionProgramBlocks(HashMap<String,FunctionProgramBlock> pbs, 
HashSet<String> cand) 
+       private static String 
rSerializeFunctionProgramBlocks(HashMap<String,FunctionProgramBlock> pbs, 
HashSet<String> cand, HashMap<String, byte[]> clsMap) 
                throws DMLRuntimeException
        {
                StringBuilder sb = new StringBuilder();
@@ -1105,22 +1094,20 @@ public class ProgramConverter
                        if( !cand.contains(pb.getKey()) ) //skip function not 
included in the parfor body
                                continue;
                                
-                       if( count>0 )
-                       {
+                       if( count>0 ) {
                           sb.append( ELEMENT_DELIM );
                           sb.append( NEWLINE );
                        }
                        sb.append( pb.getKey() );
                        sb.append( KEY_VALUE_DELIM );
-                       sb.append( rSerializeProgramBlock( pb.getValue() ) );
-                       
+                       sb.append( rSerializeProgramBlock(pb.getValue(), 
clsMap) );
                        count++;
                }
                sb.append(NEWLINE);
                return sb.toString();
        }
 
-       public static String rSerializeProgramBlocks(ArrayList<ProgramBlock> 
pbs) 
+       private static String rSerializeProgramBlocks(ArrayList<ProgramBlock> 
pbs, HashMap<String, byte[]> clsMap) 
                throws DMLRuntimeException
        {
                StringBuilder sb = new StringBuilder();
@@ -1132,14 +1119,14 @@ public class ProgramConverter
                           sb.append( ELEMENT_DELIM );
                           sb.append(NEWLINE);
                        }
-                       sb.append( rSerializeProgramBlock(pb) );
+                       sb.append( rSerializeProgramBlock(pb, clsMap) );
                        count++;
                }
 
                return sb.toString();
        }
 
-       public static String rSerializeProgramBlock( ProgramBlock pb) 
+       private static String rSerializeProgramBlock( ProgramBlock pb, 
HashMap<String, byte[]> clsMap ) 
                throws DMLRuntimeException
        {
                StringBuilder sb = new StringBuilder();
@@ -1160,29 +1147,22 @@ public class ProgramConverter
                else //all generic program blocks
                        sb.append( PARFOR_PB_BEGIN );
                
-               //handle variables (not required only on top level)
-               /*sb.append( PARFOR_VARS_BEGIN );
-               sb.append( serializeVariables( ) ); 
-               sb.append( PARFOR_VARS_END );
-               sb.append( COMPONENTS_DELIM );
-               */
-               
                //handle body
                if( pb instanceof WhileProgramBlock )
                {
                        WhileProgramBlock wpb = (WhileProgramBlock) pb;
                        sb.append( PARFOR_INST_BEGIN );
-                       sb.append( serializeInstructions( wpb.getPredicate() ) 
);
+                       sb.append( serializeInstructions( wpb.getPredicate(), 
clsMap ) );
                        sb.append( PARFOR_INST_END );
                        sb.append( COMPONENTS_DELIM );
                        sb.append( wpb.getPredicateResultVar() );
                        sb.append( COMPONENTS_DELIM );
                        sb.append( PARFOR_INST_BEGIN );
-                       sb.append( serializeInstructions( 
wpb.getExitInstructions() ) );
+                       sb.append( serializeInstructions( 
wpb.getExitInstructions(), clsMap ) );
                        sb.append( PARFOR_INST_END );
                        sb.append( COMPONENTS_DELIM );
                        sb.append( PARFOR_PBS_BEGIN );
-                       sb.append( rSerializeProgramBlocks( 
wpb.getChildBlocks()) );
+                       sb.append( rSerializeProgramBlocks( 
wpb.getChildBlocks(), clsMap) );
                        sb.append( PARFOR_PBS_END );
                }
                else if ( pb instanceof ForProgramBlock && !(pb instanceof 
ParForProgramBlock ) )
@@ -1191,23 +1171,23 @@ public class ProgramConverter
                        sb.append( 
serializeStringArray(fpb.getIterablePredicateVars()) );
                        sb.append( COMPONENTS_DELIM );
                        sb.append( PARFOR_INST_BEGIN );
-                       sb.append( serializeInstructions( 
fpb.getFromInstructions() ) );
+                       sb.append( serializeInstructions( 
fpb.getFromInstructions(), clsMap ) );
                        sb.append( PARFOR_INST_END );   
                        sb.append( COMPONENTS_DELIM );
                        sb.append( PARFOR_INST_BEGIN );
-                       sb.append( serializeInstructions( 
fpb.getToInstructions() ) );
+                       sb.append( 
serializeInstructions(fpb.getToInstructions(), clsMap) );
                        sb.append( PARFOR_INST_END );   
                        sb.append( COMPONENTS_DELIM );
                        sb.append( PARFOR_INST_BEGIN );
-                       sb.append( serializeInstructions( 
fpb.getIncrementInstructions() ) );
+                       sb.append( 
serializeInstructions(fpb.getIncrementInstructions(), clsMap) );
                        sb.append( PARFOR_INST_END );
                        sb.append( COMPONENTS_DELIM );
                        sb.append( PARFOR_INST_BEGIN );
-                       sb.append( serializeInstructions( 
fpb.getExitInstructions() ) );
+                       sb.append( 
serializeInstructions(fpb.getExitInstructions(), clsMap) );
                        sb.append( PARFOR_INST_END );
                        sb.append( COMPONENTS_DELIM );
                        sb.append( PARFOR_PBS_BEGIN );
-                       sb.append( rSerializeProgramBlocks( 
fpb.getChildBlocks()) );
+                       sb.append( 
rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) );
                        sb.append( PARFOR_PBS_END );
                }
                else if ( pb instanceof ParForProgramBlock )
@@ -1225,44 +1205,44 @@ public class ProgramConverter
                        sb.append( serializeStringHashMap( 
pfpb.getParForParams()) ); //parameters of nested parfor
                        sb.append( COMPONENTS_DELIM );
                        sb.append( PARFOR_INST_BEGIN );
-                       sb.append( serializeInstructions( 
pfpb.getFromInstructions() ) );
+                       sb.append( 
serializeInstructions(pfpb.getFromInstructions(), clsMap) );
                        sb.append( PARFOR_INST_END );   
                        sb.append( COMPONENTS_DELIM );
                        sb.append( PARFOR_INST_BEGIN );
-                       sb.append( serializeInstructions( 
pfpb.getToInstructions() ) );
+                       sb.append( 
serializeInstructions(pfpb.getToInstructions(), clsMap) );
                        sb.append( PARFOR_INST_END );   
                        sb.append( COMPONENTS_DELIM );
                        sb.append( PARFOR_INST_BEGIN );
-                       sb.append( serializeInstructions( 
pfpb.getIncrementInstructions() ) );
+                       sb.append( 
serializeInstructions(pfpb.getIncrementInstructions(), clsMap) );
                        sb.append( PARFOR_INST_END );   
                        sb.append( COMPONENTS_DELIM );  
                        sb.append( PARFOR_INST_BEGIN );
-                       sb.append( serializeInstructions( 
pfpb.getExitInstructions() ) );
+                       sb.append( 
serializeInstructions(pfpb.getExitInstructions(), clsMap) );
                        sb.append( PARFOR_INST_END );
                        sb.append( COMPONENTS_DELIM );
                        sb.append( PARFOR_PBS_BEGIN );
-                       sb.append( rSerializeProgramBlocks( 
pfpb.getChildBlocks() ) );
+                       sb.append( rSerializeProgramBlocks( 
pfpb.getChildBlocks(), clsMap ) );
                        sb.append( PARFOR_PBS_END );
                }                               
                else if ( pb instanceof IfProgramBlock )
                {
                        IfProgramBlock ipb = (IfProgramBlock) pb;
                        sb.append( PARFOR_INST_BEGIN );
-                       sb.append( serializeInstructions( ipb.getPredicate() ) 
);
+                       sb.append( serializeInstructions(ipb.getPredicate(), 
clsMap) );
                        sb.append( PARFOR_INST_END );   
                        sb.append( COMPONENTS_DELIM );
                        sb.append( ipb.getPredicateResultVar() );
                        sb.append( COMPONENTS_DELIM );
                        sb.append( PARFOR_INST_BEGIN );
-                       sb.append( serializeInstructions( 
ipb.getExitInstructions() ) );
+                       sb.append( 
serializeInstructions(ipb.getExitInstructions(), clsMap) );
                        sb.append( PARFOR_INST_END );
                        sb.append( COMPONENTS_DELIM );
                        sb.append( PARFOR_PBS_BEGIN );
-                       sb.append( rSerializeProgramBlocks( 
ipb.getChildBlocksIfBody() ) );
+                       sb.append( 
rSerializeProgramBlocks(ipb.getChildBlocksIfBody(), clsMap) );
                        sb.append( PARFOR_PBS_END );
                        sb.append( COMPONENTS_DELIM );
                        sb.append( PARFOR_PBS_BEGIN );
-                       sb.append( rSerializeProgramBlocks( 
ipb.getChildBlocksElseBody() ) );
+                       sb.append( 
rSerializeProgramBlocks(ipb.getChildBlocksElseBody(), clsMap) );
                        sb.append( PARFOR_PBS_END );
                }
                else if( pb instanceof FunctionProgramBlock && !(pb instanceof 
ExternalFunctionProgramBlock) )
@@ -1274,11 +1254,11 @@ public class ProgramConverter
                        sb.append( serializeDataIdentifiers( 
fpb.getOutputParams() ) );
                        sb.append( COMPONENTS_DELIM );
                        sb.append( PARFOR_INST_BEGIN );
-                       sb.append( serializeInstructions( fpb.getInstructions() 
) );
+                       sb.append( serializeInstructions(fpb.getInstructions(), 
clsMap) );
                        sb.append( PARFOR_INST_END );   
                        sb.append( COMPONENTS_DELIM );
                        sb.append( PARFOR_PBS_BEGIN );
-                       sb.append( rSerializeProgramBlocks( 
fpb.getChildBlocks() ) );
+                       sb.append( 
rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) );
                        sb.append( PARFOR_PBS_END );
                        sb.append( COMPONENTS_DELIM );
                }
@@ -1302,17 +1282,16 @@ public class ProgramConverter
                        
                        sb.append( PARFOR_INST_BEGIN );
                        //create on construction anyway 
-                       //sb.append( serializeInstructions( 
fpb.getInstructions() ) ); 
                        sb.append( PARFOR_INST_END );   
                        sb.append( COMPONENTS_DELIM );
                        sb.append( PARFOR_PBS_BEGIN );
-                       sb.append( rSerializeProgramBlocks( 
fpb.getChildBlocks() ) );
+                       sb.append( 
rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) );
                        sb.append( PARFOR_PBS_END );
                }
                else //all generic program blocks
                {
                        sb.append( PARFOR_INST_BEGIN );
-                       sb.append( serializeInstructions( pb.getInstructions() 
) );
+                       sb.append( serializeInstructions(pb.getInstructions(), 
clsMap) );
                        sb.append( PARFOR_INST_END );
                }
                
@@ -1404,7 +1383,7 @@ public class ProgramConverter
                return prog;
        }
 
-       public static LocalVariableMap parseVariables(String in) 
+       private static LocalVariableMap parseVariables(String in) 
                throws DMLRuntimeException
        {
                LocalVariableMap ret = null;
@@ -1422,7 +1401,7 @@ public class ProgramConverter
                return ret;
        }
 
-       public static HashMap<String,FunctionProgramBlock> 
parseFunctionProgramBlocks( String in, Program prog, int id ) 
+       private static HashMap<String,FunctionProgramBlock> 
parseFunctionProgramBlocks( String in, Program prog, int id ) 
                throws DMLRuntimeException
        {
                HashMap<String,FunctionProgramBlock> ret = new HashMap<String, 
FunctionProgramBlock>();
@@ -1437,18 +1416,12 @@ public class ProgramConverter
                        String tmp1 = lvar.substring(0, index); // + 
CP_CHILD_THREAD+id;
                        String tmp2 = lvar.substring(index + 1);
                        ret.put(tmp1, 
(FunctionProgramBlock)rParseProgramBlock(tmp2, prog, id));
-                       
-                       //put first copy into prog (for future deep copies)
-                       //index = lvar2.indexOf("=");
-                       //tmp1 = lvar2.substring(0, index);
-                       //tmp2 = lvar2.substring(index + 1);
-                       //ret.put(tmp1, 
(FunctionProgramBlock)rParseProgramBlock(tmp2, prog, id));
                }
 
                return ret;
        }
 
-       public static ArrayList<ProgramBlock> rParseProgramBlocks(String in, 
Program prog, int id) 
+       private static ArrayList<ProgramBlock> rParseProgramBlocks(String in, 
Program prog, int id) 
                throws DMLRuntimeException
        {
                ArrayList<ProgramBlock> pbs = new ArrayList<ProgramBlock>();
@@ -1464,7 +1437,7 @@ public class ProgramConverter
                return pbs;
        }
 
-       public static ProgramBlock rParseProgramBlock( String in, Program prog, 
int id )
+       private static ProgramBlock rParseProgramBlock( String in, Program 
prog, int id )
                throws DMLRuntimeException
        {
                ProgramBlock pb = null;
@@ -1489,7 +1462,7 @@ public class ProgramConverter
                return pb;
        }
 
-       public static WhileProgramBlock rParseWhileProgramBlock( String in, 
Program prog, int id ) 
+       private static WhileProgramBlock rParseWhileProgramBlock( String in, 
Program prog, int id ) 
                throws DMLRuntimeException
        {
                String lin = in.substring( 
PARFOR_PB_WHILE.length(),in.length()-PARFOR_PB_END.length()); 
@@ -1516,7 +1489,7 @@ public class ProgramConverter
                return wpb;
        }
 
-       public static ForProgramBlock rParseForProgramBlock( String in, Program 
prog, int id ) 
+       private static ForProgramBlock rParseForProgramBlock( String in, 
Program prog, int id ) 
                throws DMLRuntimeException
        {
                String lin = in.substring( 
PARFOR_PB_FOR.length(),in.length()-PARFOR_PB_END.length()); 
@@ -1549,7 +1522,7 @@ public class ProgramConverter
                return fpb;
        }
 
-       public static ParForProgramBlock rParseParForProgramBlock( String in, 
Program prog, int id ) 
+       private static ParForProgramBlock rParseParForProgramBlock( String in, 
Program prog, int id ) 
                throws DMLRuntimeException
        {
                String lin = in.substring( 
PARFOR_PB_PARFOR.length(),in.length()-PARFOR_PB_END.length()); 
@@ -1586,7 +1559,7 @@ public class ProgramConverter
                return pfpb;
        }
 
-       public static IfProgramBlock rParseIfProgramBlock( String in, Program 
prog, int id ) 
+       private static IfProgramBlock rParseIfProgramBlock( String in, Program 
prog, int id ) 
                throws DMLRuntimeException
        {
                String lin = in.substring( 
PARFOR_PB_IF.length(),in.length()-PARFOR_PB_END.length()); 
@@ -1615,7 +1588,7 @@ public class ProgramConverter
                return ipb;
        }
 
-       public static FunctionProgramBlock rParseFunctionProgramBlock( String 
in, Program prog, int id ) 
+       private static FunctionProgramBlock rParseFunctionProgramBlock( String 
in, Program prog, int id ) 
                throws DMLRuntimeException
        {
                String lin = in.substring( 
PARFOR_PB_FC.length(),in.length()-PARFOR_PB_END.length()); 
@@ -1643,7 +1616,7 @@ public class ProgramConverter
                return fpb;
        }
 
-       public static ExternalFunctionProgramBlock 
rParseExternalFunctionProgramBlock( String in, Program prog, int id ) 
+       private static ExternalFunctionProgramBlock 
rParseExternalFunctionProgramBlock( String in, Program prog, int id ) 
                throws DMLRuntimeException
        {
                String lin = in.substring( 
PARFOR_PB_EFC.length(),in.length()-PARFOR_PB_END.length()); 
@@ -1677,7 +1650,7 @@ public class ProgramConverter
                return efpb;
        }
 
-       public static ProgramBlock rParseGenericProgramBlock( String in, 
Program prog, int id ) 
+       private static ProgramBlock rParseGenericProgramBlock( String in, 
Program prog, int id ) 
                throws DMLRuntimeException
        {
                String lin = in.substring( 
PARFOR_PB_BEGIN.length(),in.length()-PARFOR_PB_END.length()); 
@@ -1691,7 +1664,7 @@ public class ProgramConverter
                return pb;
        }
 
-       public static ArrayList<Instruction> parseInstructions( String in, int 
id ) 
+       private static ArrayList<Instruction> parseInstructions( String in, int 
id ) 
                throws DMLRuntimeException
        {
                ArrayList<Instruction> insts = new ArrayList<Instruction>();  
@@ -1718,7 +1691,7 @@ public class ProgramConverter
                return insts;
        }
 
-       public static HashMap<String,String> parseStringHashMap( String in )
+       private static HashMap<String,String> parseStringHashMap( String in )
        {
                HashMap<String,String> vars = new HashMap<String, String>();
                StringTokenizer st = new StringTokenizer(in,ELEMENT_DELIM);
@@ -1734,7 +1707,7 @@ public class ProgramConverter
                return vars;
        }
 
-       public static ArrayList<String> parseStringArrayList( String in )
+       private static ArrayList<String> parseStringArrayList( String in )
        {
                ArrayList<String> vars = new ArrayList<String>();
                StringTokenizer st = new StringTokenizer(in,ELEMENT_DELIM);
@@ -1747,7 +1720,7 @@ public class ProgramConverter
                return vars;
        }
 
-       public static String[] parseStringArray( String in )
+       private static String[] parseStringArray( String in )
        {
                StringTokenizer st = new StringTokenizer(in, ELEMENT_DELIM);
                int len = st.countTokens();
@@ -1763,7 +1736,7 @@ public class ProgramConverter
                return a;
        }
 
-       public static ArrayList<DataIdentifier> parseDataIdentifiers( String in 
)
+       private static ArrayList<DataIdentifier> parseDataIdentifiers( String 
in )
        {
                ArrayList<DataIdentifier> vars = new 
ArrayList<DataIdentifier>();
                StringTokenizer st = new StringTokenizer(in, ELEMENT_DELIM);
@@ -1777,7 +1750,7 @@ public class ProgramConverter
                return vars;
        }
 
-       public static DataIdentifier parseDataIdentifier( String in )
+       private static DataIdentifier parseDataIdentifier( String in )
        {
                StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM);
                String name = st.nextToken();
@@ -1867,7 +1840,7 @@ public class ProgramConverter
                return ret;
        }
 
-       public static ExecutionContext parseExecutionContext(String in, Program 
prog) 
+       private static ExecutionContext parseExecutionContext(String in, 
Program prog) 
                throws DMLRuntimeException
        {
                ExecutionContext ec = null;
@@ -1884,9 +1857,7 @@ public class ProgramConverter
                return ec;
        }
        
-       public static void parseAndSetAdditionalConfigurations(String conf)
-       {
-               //set statistics flag
+       private static void parseAndSetAdditionalConfigurations(String conf) {
                String[] statsFlag = conf.split("=");
                DMLScript.STATISTICS = Boolean.parseBoolean(statsFlag[1]);
        }
@@ -1903,7 +1874,7 @@ public class ProgramConverter
         * @return instruction
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public static Instruction saveReplaceThreadID( Instruction inst, String 
pattern, String replacement ) 
+       private static Instruction saveReplaceThreadID( Instruction inst, 
String pattern, String replacement ) 
                throws DMLRuntimeException
        {
                //currently known, relevant instructions: createvar, rand, seq, 
extfunct, 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
index 0c4b570..6f46ab8 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
@@ -19,6 +19,7 @@
 
 package org.apache.sysml.runtime.controlprogram.parfor;
 
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -56,9 +57,9 @@ public class RemoteDPParForSpark
        
        protected static final Log LOG = 
LogFactory.getLog(RemoteDPParForSpark.class.getName());
 
-       public static RemoteParForJobReturn runJob(long pfid, String itervar, 
String matrixvar, String program, String resultFile, 
-                       MatrixObject input, ExecutionContext ec, 
PDataPartitionFormat dpf, OutputInfo oi, boolean tSparseCol, //config params
-                       boolean enableCPCaching, int numReducers )  //opt params
+       public static RemoteParForJobReturn runJob(long pfid, String itervar, 
String matrixvar, String program, HashMap<String, byte[]> clsMap,
+                       String resultFile, MatrixObject input, ExecutionContext 
ec, PDataPartitionFormat dpf, OutputInfo oi, 
+                       boolean tSparseCol, boolean enableCPCaching, int 
numReducers ) 
                throws DMLRuntimeException
        {
                String jobname = "ParFor-DPESP";
@@ -84,7 +85,7 @@ public class RemoteDPParForSpark
                
                //core parfor datapartition-execute
                DataPartitionerRemoteSparkMapper dpfun = new 
DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf);
-               RemoteDPParForSparkWorker efun = new 
RemoteDPParForSparkWorker(program, matrixvar, itervar, 
+               RemoteDPParForSparkWorker efun = new 
RemoteDPParForSparkWorker(program, clsMap, matrixvar, itervar, 
                                          enableCPCaching, mc, tSparseCol, dpf, 
oi, aTasks, aIters);
                List<Tuple2<Long,String>> out = 
                                in.flatMapToPair(dpfun)       //partition the 
input blocks

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index 458f149..67d59dc 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -21,13 +21,16 @@ package org.apache.sysml.runtime.controlprogram.parfor;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.util.LongAccumulator;
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.codegen.CodegenUtils;
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
@@ -49,6 +52,7 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
        private static final long serialVersionUID = 30223759283155139L;
        
        private final String  _prog;
+       private final HashMap<String, byte[]> _clsMap;
        private final boolean _caching;
        private final String _inputVar;
        private final String _iterVar;
@@ -64,10 +68,13 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
        private final LongAccumulator _aTasks;
        private final LongAccumulator _aIters;
        
-       public RemoteDPParForSparkWorker(String program, String inputVar, 
String iterVar, boolean cpCaching, MatrixCharacteristics mc, boolean 
tSparseCol, PDataPartitionFormat dpf, OutputInfo oinfo, LongAccumulator atasks, 
LongAccumulator aiters) 
+       public RemoteDPParForSparkWorker(String program, HashMap<String, 
byte[]> clsMap, String inputVar, String iterVar, 
+                       boolean cpCaching, MatrixCharacteristics mc, boolean 
tSparseCol, PDataPartitionFormat dpf, OutputInfo oinfo, 
+                       LongAccumulator atasks, LongAccumulator aiters) 
                throws DMLRuntimeException
        {
                _prog = program;
+               _clsMap = clsMap;
                _caching = cpCaching;
                _inputVar = inputVar;
                _iterVar = iterVar;
@@ -93,7 +100,7 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
                ArrayList<Tuple2<Long,String>> ret = new 
ArrayList<Tuple2<Long,String>>();
                
                //lazy parworker initialization
-               configureWorker( TaskContext.get().taskAttemptId() ); 
//requires Spark 1.3
+               configureWorker( TaskContext.get().taskAttemptId() );
        
                //process all matrix partitions of this data partition
                MatrixBlock partition = null;
@@ -138,6 +145,12 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
        {
                _workerID = ID;
                
+               //initialize codegen class cache (before program parsing)
+               synchronized( CodegenUtils.class ) {
+                       for( Entry<String, byte[]> e : _clsMap.entrySet() )
+                               CodegenUtils.loadClass(e.getKey(), 
e.getValue());
+               }
+               
                //parse and setup parfor body program
                ParForBody body = ProgramConverter.parseParForBody(_prog, 
(int)_workerID);
                _childBlocks = body.getChildBlocks();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
index 9d3f0f3..5e69b19 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
@@ -19,6 +19,7 @@
 
 package org.apache.sysml.runtime.controlprogram.parfor;
 
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -53,8 +54,8 @@ public class RemoteParForSpark
        
        protected static final Log LOG = 
LogFactory.getLog(RemoteParForSpark.class.getName());
 
-       public static RemoteParForJobReturn runJob(long pfid, String program, 
List<Task> tasks, ExecutionContext ec,
-                                                          boolean cpCaching, 
int numMappers) 
+       public static RemoteParForJobReturn runJob(long pfid, String program, 
HashMap<String, byte[]> clsMap, 
+                       List<Task> tasks, ExecutionContext ec, boolean 
cpCaching, int numMappers) 
                throws DMLRuntimeException  
        {
                String jobname = "ParFor-ESP";
@@ -69,7 +70,7 @@ public class RemoteParForSpark
                
                //run remote_spark parfor job 
                //(w/o lazy evaluation to fit existing parfor framework, e.g., 
result merge)
-               RemoteParForSparkWorker func = new 
RemoteParForSparkWorker(program, cpCaching, aTasks, aIters);
+               RemoteParForSparkWorker func = new 
RemoteParForSparkWorker(program, clsMap, cpCaching, aTasks, aIters);
                List<Tuple2<Long,String>> out = 
                                sc.parallelize( tasks, numMappers )  //create 
rdd of parfor tasks
                          .flatMapToPair( func )             //execute parfor 
tasks 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index 2ea802d..6ed5d1f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -21,12 +21,15 @@ package org.apache.sysml.runtime.controlprogram.parfor;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map.Entry;
 
 import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.util.LongAccumulator;
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.codegen.CodegenUtils;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
@@ -39,16 +42,18 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
        private static final long serialVersionUID = -3254950138084272296L;
 
        private final String  _prog;
+       private final HashMap<String, byte[]> _clsMap;
        private boolean _initialized = false;
        private boolean _caching = true;
        
        private final LongAccumulator _aTasks;
        private final LongAccumulator _aIters;
        
-       public RemoteParForSparkWorker(String program, boolean cpCaching, 
LongAccumulator atasks, LongAccumulator aiters) 
+       public RemoteParForSparkWorker(String program, HashMap<String, byte[]> 
clsMap, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters) 
                throws DMLRuntimeException
        {
                _prog = program;
+               _clsMap = clsMap;
                _initialized = false;
                _caching = cpCaching;
                
@@ -88,6 +93,12 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
        {
                _workerID = ID;
                
+               //initialize codegen class cache (before program parsing)
+               synchronized( CodegenUtils.class ) {
+                       for( Entry<String, byte[]> e : _clsMap.entrySet() )
+                               CodegenUtils.loadClass(e.getKey(), 
e.getValue());
+               }
+               
                //parse and setup parfor body program
                ParForBody body = ProgramConverter.parseParForBody(_prog, 
(int)_workerID);
                _childBlocks = body.getChildBlocks();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/runtime/instructions/cp/SpoofCPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/SpoofCPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/SpoofCPInstruction.java
index 61313d7..52bc926 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/SpoofCPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/SpoofCPInstruction.java
@@ -44,15 +44,18 @@ public class SpoofCPInstruction extends 
ComputationCPInstruction
                _numThreads = k;
                _in = in;
        }
+       
+       public Class<?> getOperatorClass() {
+               return _class;
+       }
 
        public static SpoofCPInstruction parseInstruction(String str) 
                throws DMLRuntimeException 
        {
                String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
                
-               //String opcode = parts[0];
                ArrayList<CPOperand> inlist = new ArrayList<CPOperand>();
-               Class<?> cla = CodegenUtils.loadClass(parts[1], null);
+               Class<?> cla = CodegenUtils.loadClass(parts[1]);
                String opcode =  parts[0] + CodegenUtils.getSpoofType(cla);
                
                for( int i=2; i<parts.length-2; i++ )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
index 15b0751..41bcffb 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
@@ -73,7 +73,7 @@ public class SpoofSPInstruction extends SPInstruction
                
                //String opcode = parts[0];
                ArrayList<CPOperand> inlist = new ArrayList<CPOperand>();
-               Class<?> cls = CodegenUtils.loadClass(parts[1], null);
+               Class<?> cls = CodegenUtils.loadClass(parts[1]);
                byte[] classBytes = CodegenUtils.getClassAsByteArray(parts[1]);
                String opcode =  parts[0] + CodegenUtils.getSpoofType(cls);
                


Reply via email to