[SYSTEMML-1360][SYSTEMML-1290] Codegen instructions in parfor spark jobs Our code generator compiles and loads generated operators at the driver. Without shipping these classes to the executors, parfor spark jobs with spoof instructions in the body fail due to missing classes. This patch now explicitly ships the byte representation of necessary classes (referenced from instructions in the parfor body) to parfor workers.
Furthermore, this patch also hardens the transfer of generated classes for distributed spoof spark instructions by shipping the actual byte code and modifying the compilation chain to avoid deleting generated class files (which created a brittle dependence on the class cache). Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/d62b2fce Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/d62b2fce Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/d62b2fce Branch: refs/heads/master Commit: d62b2fced61e6000081e93ca0298a9a16b86d437 Parents: 9b69f36 Author: Matthias Boehm <[email protected]> Authored: Thu Mar 2 14:26:38 2017 -0800 Committer: Matthias Boehm <[email protected]> Committed: Thu Mar 2 14:26:38 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/sysml/api/DMLScript.java | 7 +- .../sysml/runtime/codegen/CodegenUtils.java | 74 ++------ .../controlprogram/ParForProgramBlock.java | 14 +- .../controlprogram/parfor/ProgramConverter.java | 187 ++++++++----------- .../parfor/RemoteDPParForSpark.java | 9 +- .../parfor/RemoteDPParForSparkWorker.java | 17 +- .../parfor/RemoteParForSpark.java | 7 +- .../parfor/RemoteParForSparkWorker.java | 13 +- .../instructions/cp/SpoofCPInstruction.java | 7 +- .../instructions/spark/SpoofSPInstruction.java | 2 +- 10 files changed, 153 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/api/DMLScript.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/DMLScript.java b/src/main/java/org/apache/sysml/api/DMLScript.java index e0eff6c..25c03ea 100644 --- a/src/main/java/org/apache/sysml/api/DMLScript.java +++ b/src/main/java/org/apache/sysml/api/DMLScript.java @@ -581,7 +581,7 @@ public class DMLScript AParserWrapper parser = AParserWrapper.createParser(parsePyDML); DMLProgram prog = parser.parse(DML_FILE_PATH_ANTLR_PARSER, dmlScriptStr, argVals); - //Step 4: construct HOP DAGs (incl LVA and validate) + //Step 4: construct HOP DAGs (incl LVA, validate, and setup) DMLTranslator dmlt = new DMLTranslator(prog); dmlt.liveVariableAnalysis(prog); dmlt.validateParseTree(prog); @@ -592,6 +592,9 @@ public class DMLScript dmlt.printHops(prog); DMLTranslator.resetHopsDAGVisitStatus(prog); } + + //init working directories (before usage by following compilation steps) + initHadoopExecution( dmlconf ); //Step 5: rewrite HOP DAGs (incl IPA and memory estimates) dmlt.rewriteHopsDAG(prog); @@ -673,8 +676,6 @@ public class DMLScript ExecutionContext ec = null; try { - initHadoopExecution( dmlconf ); - //run execute (w/ exception handling to ensure proper shutdown) ec = ExecutionContextFactory.createContext(rtprog); rtprog.execute( ec ); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java b/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java index 54b421c..f5470de 100644 --- a/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java +++ b/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java @@ -19,11 +19,9 @@ package org.apache.sysml.runtime.codegen; -import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.io.ObjectOutputStream; import java.net.URL; import java.net.URLClassLoader; import java.util.Arrays; @@ -48,9 +46,7 @@ import org.apache.sysml.utils.Statistics; public class CodegenUtils { - //cache to reuse compiled and loaded classes (this is also a workaround for classes, - //compiled during initial compilation and subsequently loaded as the working directory - //is cleaned up just before the actual execution + //cache to reuse compiled and loaded classes private static ConcurrentHashMap<String, Class<?>> _cache = new ConcurrentHashMap<String,Class<?>>(); private static String _workingDir = null; @@ -128,7 +124,13 @@ public class CodegenUtils return ret; } - public static Class<?> loadClass(String name, byte[] classBytes) throws DMLRuntimeException { + public static Class<?> loadClass(String name) throws DMLRuntimeException { + return loadClass(name, null); + } + + public static Class<?> loadClass(String name, byte[] classBytes) + throws DMLRuntimeException + { //reuse existing compiled class Class<?> ret = _cache.get(name); if( ret != null ) @@ -188,16 +190,10 @@ public class CodegenUtils public static byte[] getClassAsByteArray(String name) throws DMLRuntimeException { - //reuse existing compiled class - Class<?> cls = _cache.get(name); - if( cls != null ) - return getClassAsByteArray(cls); - - String classAsPath = name.replace('.', '/') + ".class"; URLClassLoader classLoader = null; - byte[] ret = null; + InputStream stream = null; try { //dynamically load compiled class @@ -205,56 +201,18 @@ public class CodegenUtils classLoader = new URLClassLoader( new URL[]{new File(_workingDir).toURI().toURL(), runDir}, CodegenUtils.class.getClassLoader()); - InputStream stream = classLoader.getResourceAsStream(classAsPath); - ret = IOUtils.toByteArray(stream); + stream = classLoader.getResourceAsStream(classAsPath); + return IOUtils.toByteArray(stream); } catch (IOException e) { throw new DMLRuntimeException(e); } finally { IOUtilFunctions.closeSilently(classLoader); + IOUtilFunctions.closeSilently(stream); } - - return ret; } - - public static byte[] getClassAsByteArray(Class<?> cls) - throws DMLRuntimeException - { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - try { - ObjectOutputStream oos = new ObjectOutputStream(bos); - oos.writeObject(cls); - oos.flush(); - return bos.toByteArray(); - } - catch( IOException e ) { - throw new DMLRuntimeException(e); - } - finally { - IOUtilFunctions.closeSilently(bos); - } - } - - private static void createWorkingDir() throws DMLRuntimeException { - if( _workingDir != null ) - return; - String tmp = LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_CODEGEN); - LocalFileUtils.createLocalFileIfNotExist(tmp); - _workingDir = tmp; - } - - public static URL[] getUrls() throws DMLRuntimeException { - try { - URL runDir = CodegenUtils.class.getProtectionDomain().getCodeSource().getLocation(); - return new URL[]{new File(_workingDir).toURI().toURL(), runDir}; - } - catch(Exception e) { - throw new DMLRuntimeException(e); - } - } - public static String getSpoofType(Class<?> cls) { if(cls.getSuperclass() == SpoofCellwise.class) return "Cell" + cls.getName().split("\\.")[1]; @@ -269,4 +227,12 @@ public class CodegenUtils public static void clearClassCache() { _cache.clear(); } + + private static void createWorkingDir() throws DMLRuntimeException { + if( _workingDir != null ) + return; + String tmp = LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_CODEGEN); + LocalFileUtils.createLocalFileIfNotExist(tmp); + _workingDir = tmp; + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java index 4cdfa7b..1023a80 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java @@ -986,8 +986,9 @@ public class ParForProgramBlock extends ForProgramBlock // Step 1) init parallel workers (serialize PBs) // NOTES: each mapper changes filenames with regard to his ID as we submit a single job, // cannot reuse serialized string, since variables are serialized as well. - ParForBody body = new ParForBody( _childBlocks, _resultVars, ec ); - String program = ProgramConverter.serializeParForBody( body ); + ParForBody body = new ParForBody(_childBlocks, _resultVars, ec); + HashMap<String, byte[]> clsMap = new HashMap<String, byte[]>(); + String program = ProgramConverter.serializeParForBody(body, clsMap); if( _monitor ) StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_PARWRK_T, time.stop()); @@ -1008,7 +1009,7 @@ public class ParForProgramBlock extends ForProgramBlock // Step 3) submit Spark parfor job (no lazy evaluation, since collect on result) //MatrixObject colocatedDPMatrixObj = (_colocatedDPMatrix!=null)? (MatrixObject)ec.getVariable(_colocatedDPMatrix) : null; - RemoteParForJobReturn ret = RemoteParForSpark.runJob(_ID, program, tasks, ec, _enableCPCaching, _numThreads); + RemoteParForJobReturn ret = RemoteParForSpark.runJob(_ID, program, clsMap, tasks, ec, _enableCPCaching, _numThreads); if( _monitor ) StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop()); @@ -1050,7 +1051,8 @@ public class ParForProgramBlock extends ForProgramBlock // NOTES: each mapper changes filenames with regard to his ID as we submit a single job, // cannot reuse serialized string, since variables are serialized as well. ParForBody body = new ParForBody( _childBlocks, _resultVars, ec ); - String program = ProgramConverter.serializeParForBody( body ); + HashMap<String, byte[]> clsMap = new HashMap<String, byte[]>(); + String program = ProgramConverter.serializeParForBody( body, clsMap ); if( _monitor ) StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_PARWRK_T, time.stop()); @@ -1071,8 +1073,8 @@ public class ParForProgramBlock extends ForProgramBlock OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && inputDPF==PDataPartitionFormat.COLUMN_WISE)|| (inputMatrix.getSparsity()<0.001 && inputDPF==PDataPartitionFormat.ROW_WISE))? OutputInfo.BinaryCellOutputInfo : OutputInfo.BinaryBlockOutputInfo; - RemoteParForJobReturn ret = RemoteDPParForSpark.runJob(_ID, itervar.getName(), _colocatedDPMatrix, program, resultFile, - inputMatrix, ec, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads ); + RemoteParForJobReturn ret = RemoteDPParForSpark.runJob(_ID, itervar.getName(), _colocatedDPMatrix, program, clsMap, + resultFile, inputMatrix, ec, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads ); if( _monitor ) StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop()); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java index 5cc9ca1..00e72c4 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java @@ -47,6 +47,7 @@ import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.parser.WhileStatementBlock; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.codegen.CodegenUtils; import org.apache.sysml.runtime.controlprogram.ExternalFunctionProgramBlock; import org.apache.sysml.runtime.controlprogram.ExternalFunctionProgramBlockCP; import org.apache.sysml.runtime.controlprogram.ForProgramBlock; @@ -75,6 +76,7 @@ import org.apache.sysml.runtime.instructions.cp.DoubleObject; import org.apache.sysml.runtime.instructions.cp.FunctionCallCPInstruction; import org.apache.sysml.runtime.instructions.cp.IntObject; import org.apache.sysml.runtime.instructions.cp.ScalarObject; +import org.apache.sysml.runtime.instructions.cp.SpoofCPInstruction; import org.apache.sysml.runtime.instructions.cp.StringObject; import org.apache.sysml.runtime.instructions.cp.VariableCPInstruction; import org.apache.sysml.runtime.instructions.gpu.GPUInstruction; @@ -88,17 +90,14 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.udf.ExternalFunctionInvocationInstruction; /** - * Static functionalities for - * * creating deep copies of program blocks, instructions, function program blocks - * * serializing and parsing of programs, program blocks, functions program blocks + * Program converter functionalities for + * (1) creating deep copies of program blocks, instructions, function program blocks, and + * (2) serializing and parsing of programs, program blocks, functions program blocks. * - * TODO: CV, EL, ELUse program blocks not considered so far (not for BI 2.0 release) - * TODO: rewrite class to instance-based invocation (grown gradually and now inappropriate design) - * */ +//TODO: rewrite class to instance-based invocation (grown gradually and now inappropriate design) public class ProgramConverter { - protected static final Log LOG = LogFactory.getLog(ProgramConverter.class.getName()); //use escaped unicodes for separators in order to prevent string conflict @@ -721,7 +720,11 @@ public class ProgramConverter // SERIALIZATION //////////////////////////////// - public static String serializeParForBody( ParForBody body ) + public static String serializeParForBody( ParForBody body ) throws DMLRuntimeException { + return serializeParForBody(body, new HashMap<String, byte[]>()); + } + + public static String serializeParForBody( ParForBody body, HashMap<String,byte[]> clsMap ) throws DMLRuntimeException { ArrayList<ProgramBlock> pbs = body.getChildBlocks(); @@ -755,14 +758,14 @@ public class ProgramConverter //handle program sb.append( PARFOR_PROG_BEGIN ); sb.append( NEWLINE ); - sb.append( serializeProgram(prog, pbs) ); + sb.append( serializeProgram(prog, pbs, clsMap) ); sb.append( PARFOR_PROG_END ); sb.append( NEWLINE ); sb.append( COMPONENTS_DELIM ); sb.append( NEWLINE ); //handle result variable names - sb.append( serializeStringArrayList( rVnames ) ); + sb.append( serializeStringArrayList(rVnames) ); sb.append( COMPONENTS_DELIM ); //handle execution context @@ -778,7 +781,7 @@ public class ProgramConverter //handle program blocks -- ONLY instructions, not variables. sb.append( PARFOR_PBS_BEGIN ); sb.append( NEWLINE ); - sb.append( rSerializeProgramBlocks(pbs) ); + sb.append( rSerializeProgramBlocks(pbs, clsMap) ); sb.append( PARFOR_PBS_END ); sb.append( NEWLINE ); @@ -787,7 +790,7 @@ public class ProgramConverter return sb.toString(); } - public static String serializeProgram( Program prog, ArrayList<ProgramBlock> pbs ) + private static String serializeProgram( Program prog, ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap ) throws DMLRuntimeException { //note program contains variables, programblocks and function program blocks @@ -797,10 +800,10 @@ public class ProgramConverter HashSet<String> cand = new HashSet<String>(); rFindSerializationCandidates(pbs, cand); - return rSerializeFunctionProgramBlocks( fpb, cand ); + return rSerializeFunctionProgramBlocks( fpb, cand, clsMap ); } - public static void rFindSerializationCandidates( ArrayList<ProgramBlock> pbs, HashSet<String> cand ) + private static void rFindSerializationCandidates( ArrayList<ProgramBlock> pbs, HashSet<String> cand ) throws DMLRuntimeException { for( ProgramBlock pb : pbs ) @@ -842,7 +845,7 @@ public class ProgramConverter } } - public static String serializeVariables (LocalVariableMap vars) + private static String serializeVariables (LocalVariableMap vars) throws DMLRuntimeException { StringBuilder sb = new StringBuilder(); @@ -914,26 +917,12 @@ public class ProgramConverter return sb.toString(); } - public static String serializeExecutionContext( ExecutionContext ec ) - throws DMLRuntimeException - { - String ret = null; - - if( ec != null ) - { - LocalVariableMap vars = ec.getVariables(); - ret = serializeVariables( vars ); - } - else - { - ret = EMPTY; - } - - return ret; + private static String serializeExecutionContext( ExecutionContext ec ) throws DMLRuntimeException { + return (ec != null) ? serializeVariables( ec.getVariables() ) : EMPTY; } @SuppressWarnings("all") - public static String serializeInstructions( ArrayList<Instruction> inst ) + private static String serializeInstructions( ArrayList<Instruction> inst, HashMap<String, byte[]> clsMap ) throws DMLRuntimeException { StringBuilder sb = new StringBuilder(); @@ -941,11 +930,13 @@ public class ProgramConverter for( Instruction linst : inst ) { //check that only cp instruction are transmitted - if( !( linst instanceof CPInstruction - || linst instanceof SPInstruction - || linst instanceof ExternalFunctionInvocationInstruction ) ) - { + if( !( linst instanceof CPInstruction || linst instanceof ExternalFunctionInvocationInstruction ) ) throw new DMLRuntimeException( NOT_SUPPORTED_MR_INSTRUCTION + " " +linst.getClass().getName()+"\n"+linst ); + + //obtain serialized version of generated classes + if( linst instanceof SpoofCPInstruction ) { + Class<?> cla = ((SpoofCPInstruction) linst).getOperatorClass(); + clsMap.put(cla.getName(), CodegenUtils.getClassAsByteArray(cla.getName())); } if( count > 0 ) @@ -967,7 +958,7 @@ public class ProgramConverter * @param instStr instruction string * @return instruction string with replacements */ - public static String checkAndReplaceLiterals( String instStr ) + private static String checkAndReplaceLiterals( String instStr ) { String tmp = instStr; @@ -1004,7 +995,7 @@ public class ProgramConverter return tmp; } - public static String serializeStringHashMap( HashMap<String,String> vars) + private static String serializeStringHashMap( HashMap<String,String> vars) { StringBuilder sb = new StringBuilder(); int count=0; @@ -1048,7 +1039,7 @@ public class ProgramConverter return sb.toString(); } - public static String serializeStringArray( String[] vars) + private static String serializeStringArray( String[] vars) { StringBuilder sb = new StringBuilder(); int count=0; @@ -1066,7 +1057,7 @@ public class ProgramConverter return sb.toString(); } - public static String serializeDataIdentifiers( ArrayList<DataIdentifier> var) + private static String serializeDataIdentifiers( ArrayList<DataIdentifier> var) { StringBuilder sb = new StringBuilder(); int count=0; @@ -1080,10 +1071,8 @@ public class ProgramConverter return sb.toString(); } - public static String serializeDataIdentifier( DataIdentifier dat ) - { + private static String serializeDataIdentifier( DataIdentifier dat ) { // SCHEMA: <name>|<datatype>|<valuetype> - StringBuilder sb = new StringBuilder(); sb.append(dat.getName()); sb.append(DATA_FIELD_DELIM); @@ -1094,7 +1083,7 @@ public class ProgramConverter return sb.toString(); } - public static String rSerializeFunctionProgramBlocks(HashMap<String,FunctionProgramBlock> pbs, HashSet<String> cand) + private static String rSerializeFunctionProgramBlocks(HashMap<String,FunctionProgramBlock> pbs, HashSet<String> cand, HashMap<String, byte[]> clsMap) throws DMLRuntimeException { StringBuilder sb = new StringBuilder(); @@ -1105,22 +1094,20 @@ public class ProgramConverter if( !cand.contains(pb.getKey()) ) //skip function not included in the parfor body continue; - if( count>0 ) - { + if( count>0 ) { sb.append( ELEMENT_DELIM ); sb.append( NEWLINE ); } sb.append( pb.getKey() ); sb.append( KEY_VALUE_DELIM ); - sb.append( rSerializeProgramBlock( pb.getValue() ) ); - + sb.append( rSerializeProgramBlock(pb.getValue(), clsMap) ); count++; } sb.append(NEWLINE); return sb.toString(); } - public static String rSerializeProgramBlocks(ArrayList<ProgramBlock> pbs) + private static String rSerializeProgramBlocks(ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap) throws DMLRuntimeException { StringBuilder sb = new StringBuilder(); @@ -1132,14 +1119,14 @@ public class ProgramConverter sb.append( ELEMENT_DELIM ); sb.append(NEWLINE); } - sb.append( rSerializeProgramBlock(pb) ); + sb.append( rSerializeProgramBlock(pb, clsMap) ); count++; } return sb.toString(); } - public static String rSerializeProgramBlock( ProgramBlock pb) + private static String rSerializeProgramBlock( ProgramBlock pb, HashMap<String, byte[]> clsMap ) throws DMLRuntimeException { StringBuilder sb = new StringBuilder(); @@ -1160,29 +1147,22 @@ public class ProgramConverter else //all generic program blocks sb.append( PARFOR_PB_BEGIN ); - //handle variables (not required only on top level) - /*sb.append( PARFOR_VARS_BEGIN ); - sb.append( serializeVariables( ) ); - sb.append( PARFOR_VARS_END ); - sb.append( COMPONENTS_DELIM ); - */ - //handle body if( pb instanceof WhileProgramBlock ) { WhileProgramBlock wpb = (WhileProgramBlock) pb; sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions( wpb.getPredicate() ) ); + sb.append( serializeInstructions( wpb.getPredicate(), clsMap ) ); sb.append( PARFOR_INST_END ); sb.append( COMPONENTS_DELIM ); sb.append( wpb.getPredicateResultVar() ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions( wpb.getExitInstructions() ) ); + sb.append( serializeInstructions( wpb.getExitInstructions(), clsMap ) ); sb.append( PARFOR_INST_END ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_PBS_BEGIN ); - sb.append( rSerializeProgramBlocks( wpb.getChildBlocks()) ); + sb.append( rSerializeProgramBlocks( wpb.getChildBlocks(), clsMap) ); sb.append( PARFOR_PBS_END ); } else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock ) ) @@ -1191,23 +1171,23 @@ public class ProgramConverter sb.append( serializeStringArray(fpb.getIterablePredicateVars()) ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions( fpb.getFromInstructions() ) ); + sb.append( serializeInstructions( fpb.getFromInstructions(), clsMap ) ); sb.append( PARFOR_INST_END ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions( fpb.getToInstructions() ) ); + sb.append( serializeInstructions(fpb.getToInstructions(), clsMap) ); sb.append( PARFOR_INST_END ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions( fpb.getIncrementInstructions() ) ); + sb.append( serializeInstructions(fpb.getIncrementInstructions(), clsMap) ); sb.append( PARFOR_INST_END ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions( fpb.getExitInstructions() ) ); + sb.append( serializeInstructions(fpb.getExitInstructions(), clsMap) ); sb.append( PARFOR_INST_END ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_PBS_BEGIN ); - sb.append( rSerializeProgramBlocks( fpb.getChildBlocks()) ); + sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) ); sb.append( PARFOR_PBS_END ); } else if ( pb instanceof ParForProgramBlock ) @@ -1225,44 +1205,44 @@ public class ProgramConverter sb.append( serializeStringHashMap( pfpb.getParForParams()) ); //parameters of nested parfor sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions( pfpb.getFromInstructions() ) ); + sb.append( serializeInstructions(pfpb.getFromInstructions(), clsMap) ); sb.append( PARFOR_INST_END ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions( pfpb.getToInstructions() ) ); + sb.append( serializeInstructions(pfpb.getToInstructions(), clsMap) ); sb.append( PARFOR_INST_END ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions( pfpb.getIncrementInstructions() ) ); + sb.append( serializeInstructions(pfpb.getIncrementInstructions(), clsMap) ); sb.append( PARFOR_INST_END ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions( pfpb.getExitInstructions() ) ); + sb.append( serializeInstructions(pfpb.getExitInstructions(), clsMap) ); sb.append( PARFOR_INST_END ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_PBS_BEGIN ); - sb.append( rSerializeProgramBlocks( pfpb.getChildBlocks() ) ); + sb.append( rSerializeProgramBlocks( pfpb.getChildBlocks(), clsMap ) ); sb.append( PARFOR_PBS_END ); } else if ( pb instanceof IfProgramBlock ) { IfProgramBlock ipb = (IfProgramBlock) pb; sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions( ipb.getPredicate() ) ); + sb.append( serializeInstructions(ipb.getPredicate(), clsMap) ); sb.append( PARFOR_INST_END ); sb.append( COMPONENTS_DELIM ); sb.append( ipb.getPredicateResultVar() ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions( ipb.getExitInstructions() ) ); + sb.append( serializeInstructions(ipb.getExitInstructions(), clsMap) ); sb.append( PARFOR_INST_END ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_PBS_BEGIN ); - sb.append( rSerializeProgramBlocks( ipb.getChildBlocksIfBody() ) ); + sb.append( rSerializeProgramBlocks(ipb.getChildBlocksIfBody(), clsMap) ); sb.append( PARFOR_PBS_END ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_PBS_BEGIN ); - sb.append( rSerializeProgramBlocks( ipb.getChildBlocksElseBody() ) ); + sb.append( rSerializeProgramBlocks(ipb.getChildBlocksElseBody(), clsMap) ); sb.append( PARFOR_PBS_END ); } else if( pb instanceof FunctionProgramBlock && !(pb instanceof ExternalFunctionProgramBlock) ) @@ -1274,11 +1254,11 @@ public class ProgramConverter sb.append( serializeDataIdentifiers( fpb.getOutputParams() ) ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions( fpb.getInstructions() ) ); + sb.append( serializeInstructions(fpb.getInstructions(), clsMap) ); sb.append( PARFOR_INST_END ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_PBS_BEGIN ); - sb.append( rSerializeProgramBlocks( fpb.getChildBlocks() ) ); + sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) ); sb.append( PARFOR_PBS_END ); sb.append( COMPONENTS_DELIM ); } @@ -1302,17 +1282,16 @@ public class ProgramConverter sb.append( PARFOR_INST_BEGIN ); //create on construction anyway - //sb.append( serializeInstructions( fpb.getInstructions() ) ); sb.append( PARFOR_INST_END ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_PBS_BEGIN ); - sb.append( rSerializeProgramBlocks( fpb.getChildBlocks() ) ); + sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) ); sb.append( PARFOR_PBS_END ); } else //all generic program blocks { sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions( pb.getInstructions() ) ); + sb.append( serializeInstructions(pb.getInstructions(), clsMap) ); sb.append( PARFOR_INST_END ); } @@ -1404,7 +1383,7 @@ public class ProgramConverter return prog; } - public static LocalVariableMap parseVariables(String in) + private static LocalVariableMap parseVariables(String in) throws DMLRuntimeException { LocalVariableMap ret = null; @@ -1422,7 +1401,7 @@ public class ProgramConverter return ret; } - public static HashMap<String,FunctionProgramBlock> parseFunctionProgramBlocks( String in, Program prog, int id ) + private static HashMap<String,FunctionProgramBlock> parseFunctionProgramBlocks( String in, Program prog, int id ) throws DMLRuntimeException { HashMap<String,FunctionProgramBlock> ret = new HashMap<String, FunctionProgramBlock>(); @@ -1437,18 +1416,12 @@ public class ProgramConverter String tmp1 = lvar.substring(0, index); // + CP_CHILD_THREAD+id; String tmp2 = lvar.substring(index + 1); ret.put(tmp1, (FunctionProgramBlock)rParseProgramBlock(tmp2, prog, id)); - - //put first copy into prog (for future deep copies) - //index = lvar2.indexOf("="); - //tmp1 = lvar2.substring(0, index); - //tmp2 = lvar2.substring(index + 1); - //ret.put(tmp1, (FunctionProgramBlock)rParseProgramBlock(tmp2, prog, id)); } return ret; } - public static ArrayList<ProgramBlock> rParseProgramBlocks(String in, Program prog, int id) + private static ArrayList<ProgramBlock> rParseProgramBlocks(String in, Program prog, int id) throws DMLRuntimeException { ArrayList<ProgramBlock> pbs = new ArrayList<ProgramBlock>(); @@ -1464,7 +1437,7 @@ public class ProgramConverter return pbs; } - public static ProgramBlock rParseProgramBlock( String in, Program prog, int id ) + private static ProgramBlock rParseProgramBlock( String in, Program prog, int id ) throws DMLRuntimeException { ProgramBlock pb = null; @@ -1489,7 +1462,7 @@ public class ProgramConverter return pb; } - public static WhileProgramBlock rParseWhileProgramBlock( String in, Program prog, int id ) + private static WhileProgramBlock rParseWhileProgramBlock( String in, Program prog, int id ) throws DMLRuntimeException { String lin = in.substring( PARFOR_PB_WHILE.length(),in.length()-PARFOR_PB_END.length()); @@ -1516,7 +1489,7 @@ public class ProgramConverter return wpb; } - public static ForProgramBlock rParseForProgramBlock( String in, Program prog, int id ) + private static ForProgramBlock rParseForProgramBlock( String in, Program prog, int id ) throws DMLRuntimeException { String lin = in.substring( PARFOR_PB_FOR.length(),in.length()-PARFOR_PB_END.length()); @@ -1549,7 +1522,7 @@ public class ProgramConverter return fpb; } - public static ParForProgramBlock rParseParForProgramBlock( String in, Program prog, int id ) + private static ParForProgramBlock rParseParForProgramBlock( String in, Program prog, int id ) throws DMLRuntimeException { String lin = in.substring( PARFOR_PB_PARFOR.length(),in.length()-PARFOR_PB_END.length()); @@ -1586,7 +1559,7 @@ public class ProgramConverter return pfpb; } - public static IfProgramBlock rParseIfProgramBlock( String in, Program prog, int id ) + private static IfProgramBlock rParseIfProgramBlock( String in, Program prog, int id ) throws DMLRuntimeException { String lin = in.substring( PARFOR_PB_IF.length(),in.length()-PARFOR_PB_END.length()); @@ -1615,7 +1588,7 @@ public class ProgramConverter return ipb; } - public static FunctionProgramBlock rParseFunctionProgramBlock( String in, Program prog, int id ) + private static FunctionProgramBlock rParseFunctionProgramBlock( String in, Program prog, int id ) throws DMLRuntimeException { String lin = in.substring( PARFOR_PB_FC.length(),in.length()-PARFOR_PB_END.length()); @@ -1643,7 +1616,7 @@ public class ProgramConverter return fpb; } - public static ExternalFunctionProgramBlock rParseExternalFunctionProgramBlock( String in, Program prog, int id ) + private static ExternalFunctionProgramBlock rParseExternalFunctionProgramBlock( String in, Program prog, int id ) throws DMLRuntimeException { String lin = in.substring( PARFOR_PB_EFC.length(),in.length()-PARFOR_PB_END.length()); @@ -1677,7 +1650,7 @@ public class ProgramConverter return efpb; } - public static ProgramBlock rParseGenericProgramBlock( String in, Program prog, int id ) + private static ProgramBlock rParseGenericProgramBlock( String in, Program prog, int id ) throws DMLRuntimeException { String lin = in.substring( PARFOR_PB_BEGIN.length(),in.length()-PARFOR_PB_END.length()); @@ -1691,7 +1664,7 @@ public class ProgramConverter return pb; } - public static ArrayList<Instruction> parseInstructions( String in, int id ) + private static ArrayList<Instruction> parseInstructions( String in, int id ) throws DMLRuntimeException { ArrayList<Instruction> insts = new ArrayList<Instruction>(); @@ -1718,7 +1691,7 @@ public class ProgramConverter return insts; } - public static HashMap<String,String> parseStringHashMap( String in ) + private static HashMap<String,String> parseStringHashMap( String in ) { HashMap<String,String> vars = new HashMap<String, String>(); StringTokenizer st = new StringTokenizer(in,ELEMENT_DELIM); @@ -1734,7 +1707,7 @@ public class ProgramConverter return vars; } - public static ArrayList<String> parseStringArrayList( String in ) + private static ArrayList<String> parseStringArrayList( String in ) { ArrayList<String> vars = new ArrayList<String>(); StringTokenizer st = new StringTokenizer(in,ELEMENT_DELIM); @@ -1747,7 +1720,7 @@ public class ProgramConverter return vars; } - public static String[] parseStringArray( String in ) + private static String[] parseStringArray( String in ) { StringTokenizer st = new StringTokenizer(in, ELEMENT_DELIM); int len = st.countTokens(); @@ -1763,7 +1736,7 @@ public class ProgramConverter return a; } - public static ArrayList<DataIdentifier> parseDataIdentifiers( String in ) + private static ArrayList<DataIdentifier> parseDataIdentifiers( String in ) { ArrayList<DataIdentifier> vars = new ArrayList<DataIdentifier>(); StringTokenizer st = new StringTokenizer(in, ELEMENT_DELIM); @@ -1777,7 +1750,7 @@ public class ProgramConverter return vars; } - public static DataIdentifier parseDataIdentifier( String in ) + private static DataIdentifier parseDataIdentifier( String in ) { StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM); String name = st.nextToken(); @@ -1867,7 +1840,7 @@ public class ProgramConverter return ret; } - public static ExecutionContext parseExecutionContext(String in, Program prog) + private static ExecutionContext parseExecutionContext(String in, Program prog) throws DMLRuntimeException { ExecutionContext ec = null; @@ -1884,9 +1857,7 @@ public class ProgramConverter return ec; } - public static void parseAndSetAdditionalConfigurations(String conf) - { - //set statistics flag + private static void parseAndSetAdditionalConfigurations(String conf) { String[] statsFlag = conf.split("="); DMLScript.STATISTICS = Boolean.parseBoolean(statsFlag[1]); } @@ -1903,7 +1874,7 @@ public class ProgramConverter * @return instruction * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public static Instruction saveReplaceThreadID( Instruction inst, String pattern, String replacement ) + private static Instruction saveReplaceThreadID( Instruction inst, String pattern, String replacement ) throws DMLRuntimeException { //currently known, relevant instructions: createvar, rand, seq, extfunct, http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java index 0c4b570..6f46ab8 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java @@ -19,6 +19,7 @@ package org.apache.sysml.runtime.controlprogram.parfor; +import java.util.HashMap; import java.util.List; import org.apache.commons.logging.Log; @@ -56,9 +57,9 @@ public class RemoteDPParForSpark protected static final Log LOG = LogFactory.getLog(RemoteDPParForSpark.class.getName()); - public static RemoteParForJobReturn runJob(long pfid, String itervar, String matrixvar, String program, String resultFile, - MatrixObject input, ExecutionContext ec, PDataPartitionFormat dpf, OutputInfo oi, boolean tSparseCol, //config params - boolean enableCPCaching, int numReducers ) //opt params + public static RemoteParForJobReturn runJob(long pfid, String itervar, String matrixvar, String program, HashMap<String, byte[]> clsMap, + String resultFile, MatrixObject input, ExecutionContext ec, PDataPartitionFormat dpf, OutputInfo oi, + boolean tSparseCol, boolean enableCPCaching, int numReducers ) throws DMLRuntimeException { String jobname = "ParFor-DPESP"; @@ -84,7 +85,7 @@ public class RemoteDPParForSpark //core parfor datapartition-execute DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf); - RemoteDPParForSparkWorker efun = new RemoteDPParForSparkWorker(program, matrixvar, itervar, + RemoteDPParForSparkWorker efun = new RemoteDPParForSparkWorker(program, clsMap, matrixvar, itervar, enableCPCaching, mc, tSparseCol, dpf, oi, aTasks, aIters); List<Tuple2<Long,String>> out = in.flatMapToPair(dpfun) //partition the input blocks http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java index 458f149..67d59dc 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java @@ -21,13 +21,16 @@ package org.apache.sysml.runtime.controlprogram.parfor; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map.Entry; import org.apache.hadoop.io.Writable; import org.apache.spark.TaskContext; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.util.LongAccumulator; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.codegen.CodegenUtils; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; @@ -49,6 +52,7 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF private static final long serialVersionUID = 30223759283155139L; private final String _prog; + private final HashMap<String, byte[]> _clsMap; private final boolean _caching; private final String _inputVar; private final String _iterVar; @@ -64,10 +68,13 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF private final LongAccumulator _aTasks; private final LongAccumulator _aIters; - public RemoteDPParForSparkWorker(String program, String inputVar, String iterVar, boolean cpCaching, MatrixCharacteristics mc, boolean tSparseCol, PDataPartitionFormat dpf, OutputInfo oinfo, LongAccumulator atasks, LongAccumulator aiters) + public RemoteDPParForSparkWorker(String program, HashMap<String, byte[]> clsMap, String inputVar, String iterVar, + boolean cpCaching, MatrixCharacteristics mc, boolean tSparseCol, PDataPartitionFormat dpf, OutputInfo oinfo, + LongAccumulator atasks, LongAccumulator aiters) throws DMLRuntimeException { _prog = program; + _clsMap = clsMap; _caching = cpCaching; _inputVar = inputVar; _iterVar = iterVar; @@ -93,7 +100,7 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF ArrayList<Tuple2<Long,String>> ret = new ArrayList<Tuple2<Long,String>>(); //lazy parworker initialization - configureWorker( TaskContext.get().taskAttemptId() ); //requires Spark 1.3 + configureWorker( TaskContext.get().taskAttemptId() ); //process all matrix partitions of this data partition MatrixBlock partition = null; @@ -138,6 +145,12 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF { _workerID = ID; + //initialize codegen class cache (before program parsing) + synchronized( CodegenUtils.class ) { + for( Entry<String, byte[]> e : _clsMap.entrySet() ) + CodegenUtils.loadClass(e.getKey(), e.getValue()); + } + //parse and setup parfor body program ParForBody body = ProgramConverter.parseParForBody(_prog, (int)_workerID); _childBlocks = body.getChildBlocks(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java index 9d3f0f3..5e69b19 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java @@ -19,6 +19,7 @@ package org.apache.sysml.runtime.controlprogram.parfor; +import java.util.HashMap; import java.util.List; import org.apache.commons.logging.Log; @@ -53,8 +54,8 @@ public class RemoteParForSpark protected static final Log LOG = LogFactory.getLog(RemoteParForSpark.class.getName()); - public static RemoteParForJobReturn runJob(long pfid, String program, List<Task> tasks, ExecutionContext ec, - boolean cpCaching, int numMappers) + public static RemoteParForJobReturn runJob(long pfid, String program, HashMap<String, byte[]> clsMap, + List<Task> tasks, ExecutionContext ec, boolean cpCaching, int numMappers) throws DMLRuntimeException { String jobname = "ParFor-ESP"; @@ -69,7 +70,7 @@ public class RemoteParForSpark //run remote_spark parfor job //(w/o lazy evaluation to fit existing parfor framework, e.g., result merge) - RemoteParForSparkWorker func = new RemoteParForSparkWorker(program, cpCaching, aTasks, aIters); + RemoteParForSparkWorker func = new RemoteParForSparkWorker(program, clsMap, cpCaching, aTasks, aIters); List<Tuple2<Long,String>> out = sc.parallelize( tasks, numMappers ) //create rdd of parfor tasks .flatMapToPair( func ) //execute parfor tasks http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java index 2ea802d..6ed5d1f 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java @@ -21,12 +21,15 @@ package org.apache.sysml.runtime.controlprogram.parfor; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map.Entry; import org.apache.spark.TaskContext; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.util.LongAccumulator; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.codegen.CodegenUtils; import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler; @@ -39,16 +42,18 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun private static final long serialVersionUID = -3254950138084272296L; private final String _prog; + private final HashMap<String, byte[]> _clsMap; private boolean _initialized = false; private boolean _caching = true; private final LongAccumulator _aTasks; private final LongAccumulator _aIters; - public RemoteParForSparkWorker(String program, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters) + public RemoteParForSparkWorker(String program, HashMap<String, byte[]> clsMap, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters) throws DMLRuntimeException { _prog = program; + _clsMap = clsMap; _initialized = false; _caching = cpCaching; @@ -88,6 +93,12 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun { _workerID = ID; + //initialize codegen class cache (before program parsing) + synchronized( CodegenUtils.class ) { + for( Entry<String, byte[]> e : _clsMap.entrySet() ) + CodegenUtils.loadClass(e.getKey(), e.getValue()); + } + //parse and setup parfor body program ParForBody body = ProgramConverter.parseParForBody(_prog, (int)_workerID); _childBlocks = body.getChildBlocks(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/runtime/instructions/cp/SpoofCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/SpoofCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/SpoofCPInstruction.java index 61313d7..52bc926 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/SpoofCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/SpoofCPInstruction.java @@ -44,15 +44,18 @@ public class SpoofCPInstruction extends ComputationCPInstruction _numThreads = k; _in = in; } + + public Class<?> getOperatorClass() { + return _class; + } public static SpoofCPInstruction parseInstruction(String str) throws DMLRuntimeException { String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); - //String opcode = parts[0]; ArrayList<CPOperand> inlist = new ArrayList<CPOperand>(); - Class<?> cla = CodegenUtils.loadClass(parts[1], null); + Class<?> cla = CodegenUtils.loadClass(parts[1]); String opcode = parts[0] + CodegenUtils.getSpoofType(cla); for( int i=2; i<parts.length-2; i++ ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d62b2fce/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java index 15b0751..41bcffb 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java @@ -73,7 +73,7 @@ public class SpoofSPInstruction extends SPInstruction //String opcode = parts[0]; ArrayList<CPOperand> inlist = new ArrayList<CPOperand>(); - Class<?> cls = CodegenUtils.loadClass(parts[1], null); + Class<?> cls = CodegenUtils.loadClass(parts[1]); byte[] classBytes = CodegenUtils.getClassAsByteArray(parts[1]); String opcode = parts[0] + CodegenUtils.getSpoofType(cls);
