http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java index 3d61625..4e7a718 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java @@ -39,6 +39,8 @@ import static org.apache.sysml.parser.Statement.PS_UPDATE_TYPE; import static org.apache.sysml.parser.Statement.PS_VAL_FEATURES; import static org.apache.sysml.parser.Statement.PS_VAL_LABELS; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.concurrent.ExecutionException; @@ -54,6 +56,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.sysml.api.DMLScript; +import org.apache.sysml.hops.recompile.Recompiler; +import org.apache.sysml.lops.LopProperties; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; @@ -65,10 +69,12 @@ import org.apache.sysml.runtime.controlprogram.paramserv.LocalPSWorker; import org.apache.sysml.runtime.controlprogram.paramserv.LocalParamServer; import org.apache.sysml.runtime.controlprogram.paramserv.ParamServer; import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils; +import org.apache.sysml.runtime.controlprogram.paramserv.spark.SparkPSBody; import org.apache.sysml.runtime.controlprogram.paramserv.spark.SparkPSWorker; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing; import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.runtime.util.ProgramConverter; import org.apache.sysml.utils.Statistics; public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruction { @@ -110,13 +116,30 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc private void runOnSpark(SparkExecutionContext sec, PSModeType mode) { PSScheme scheme = getScheme(); int workerNum = getWorkerNum(mode); + String updFunc = getParam(PS_UPDATE_FUN); + String aggFunc = getParam(PS_AGGREGATION_FUN); + + int k = getParLevel(workerNum); + + // Get the compiled execution context + LocalVariableMap newVarsMap = createVarsMap(sec); + ExecutionContext newEC = ParamservUtils.createExecutionContext(sec, newVarsMap, updFunc, aggFunc, k); MatrixObject features = sec.getMatrixObject(getParam(PS_FEATURES)); MatrixObject labels = sec.getMatrixObject(getParam(PS_LABELS)); - SparkPSWorker worker = new SparkPSWorker(); + // Force all the instructions to CP type + Recompiler.recompileProgramBlockHierarchy2Forced( + newEC.getProgram().getProgramBlocks(), 0, new HashSet<>(), LopProperties.ExecType.CP); + + // Serialize all the needed params for remote workers + SparkPSBody body = new SparkPSBody(newEC); + HashMap<String, byte[]> clsMap = new HashMap<>(); + String program = ProgramConverter.serializeSparkPSBody(body, clsMap); + + SparkPSWorker worker = new SparkPSWorker(getParam(PS_UPDATE_FUN), getFrequency(), getEpochs(), getBatchSize(), program, clsMap); ParamservUtils.doPartitionOnSpark(sec, features, labels, scheme, workerNum) // Do data partitioning - .foreach(worker); // Run remote workers + .foreach(worker); // Run remote workers } @@ -132,15 +155,14 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc int k = getParLevel(workerNum); // Get the compiled execution context - // Create workers' execution context LocalVariableMap newVarsMap = createVarsMap(ec); - List<ExecutionContext> newECs = ParamservUtils.createExecutionContexts(ec, newVarsMap, updFunc, aggFunc, workerNum, k); + ExecutionContext newEC = ParamservUtils.createExecutionContext(ec, newVarsMap, updFunc, aggFunc, k); // Create workers' execution context - List<ExecutionContext> workerECs = newECs.subList(0, newECs.size() - 1); + List<ExecutionContext> workerECs = ParamservUtils.copyExecutionContext(newEC, workerNum); // Create the agg service's execution context - ExecutionContext aggServiceEC = newECs.get(newECs.size() - 1); + ExecutionContext aggServiceEC = ParamservUtils.copyExecutionContext(newEC, 1).get(0); PSFrequency freq = getFrequency(); PSUpdateType updateType = getUpdateType();
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java index 7bf941d..a1c89a3 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java @@ -40,7 +40,7 @@ import org.apache.sysml.runtime.controlprogram.caching.FrameObject; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; -import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter; +import org.apache.sysml.runtime.util.ProgramConverter; import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysml.runtime.instructions.Instruction; import org.apache.sysml.runtime.instructions.InstructionUtils; http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/util/ProgramConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/ProgramConverter.java b/src/main/java/org/apache/sysml/runtime/util/ProgramConverter.java new file mode 100644 index 0000000..1d2115e --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/util/ProgramConverter.java @@ -0,0 +1,1838 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.util; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.StringTokenizer; +import java.util.stream.Collectors; +import java.util.Map.Entry; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.JobConf; +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.conf.CompilerConfig.ConfigType; +import org.apache.sysml.conf.CompilerConfig; +import org.apache.sysml.conf.ConfigurationManager; +import org.apache.sysml.conf.DMLConfig; +import org.apache.sysml.hops.Hop; +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.hops.recompile.Recompiler; +import org.apache.sysml.lops.Lop; +import org.apache.sysml.parser.DMLProgram; +import org.apache.sysml.parser.DataIdentifier; +import org.apache.sysml.parser.ForStatementBlock; +import org.apache.sysml.parser.IfStatementBlock; +import org.apache.sysml.parser.ParForStatementBlock; +import org.apache.sysml.parser.StatementBlock; +import org.apache.sysml.parser.Expression.DataType; +import org.apache.sysml.parser.Expression.ValueType; +import org.apache.sysml.parser.ParForStatementBlock.ResultVar; +import org.apache.sysml.parser.WhileStatementBlock; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.codegen.CodegenUtils; +import org.apache.sysml.runtime.controlprogram.ExternalFunctionProgramBlock; +import org.apache.sysml.runtime.controlprogram.ExternalFunctionProgramBlockCP; +import org.apache.sysml.runtime.controlprogram.ForProgramBlock; +import org.apache.sysml.runtime.controlprogram.FunctionProgramBlock; +import org.apache.sysml.runtime.controlprogram.IfProgramBlock; +import org.apache.sysml.runtime.controlprogram.LocalVariableMap; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock; +import org.apache.sysml.runtime.controlprogram.Program; +import org.apache.sysml.runtime.controlprogram.ProgramBlock; +import org.apache.sysml.runtime.controlprogram.WhileProgramBlock; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PExecMode; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat; +import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; +import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory; +import org.apache.sysml.runtime.controlprogram.paramserv.spark.SparkPSBody; +import org.apache.sysml.runtime.controlprogram.parfor.ParForBody; +import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; +import org.apache.sysml.runtime.instructions.CPInstructionParser; +import org.apache.sysml.runtime.instructions.Instruction; +import org.apache.sysml.runtime.instructions.InstructionParser; +import org.apache.sysml.runtime.instructions.MRJobInstruction; +import org.apache.sysml.runtime.instructions.cp.BooleanObject; +import org.apache.sysml.runtime.instructions.cp.CPInstruction; +import org.apache.sysml.runtime.instructions.cp.Data; +import org.apache.sysml.runtime.instructions.cp.DoubleObject; +import org.apache.sysml.runtime.instructions.cp.FunctionCallCPInstruction; +import org.apache.sysml.runtime.instructions.cp.IntObject; +import org.apache.sysml.runtime.instructions.cp.ListObject; +import org.apache.sysml.runtime.instructions.cp.ScalarObject; +import org.apache.sysml.runtime.instructions.cp.SpoofCPInstruction; +import org.apache.sysml.runtime.instructions.cp.StringObject; +import org.apache.sysml.runtime.instructions.cp.VariableCPInstruction; +import org.apache.sysml.runtime.instructions.gpu.GPUInstruction; +import org.apache.sysml.runtime.instructions.mr.MRInstruction; +import org.apache.sysml.runtime.instructions.spark.SPInstruction; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; +import org.apache.sysml.runtime.matrix.MetaDataFormat; +import org.apache.sysml.runtime.matrix.data.InputInfo; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.OutputInfo; +import org.apache.sysml.udf.ExternalFunctionInvocationInstruction; + +/** + * Program converter functionalities for + * (1) creating deep copies of program blocks, instructions, function program blocks, and + * (2) serializing and parsing of programs, program blocks, functions program blocks. + * + */ +//TODO: rewrite class to instance-based invocation (grown gradually and now inappropriate design) +public class ProgramConverter +{ + protected static final Log LOG = LogFactory.getLog(ProgramConverter.class.getName()); + + //use escaped unicodes for separators in order to prevent string conflict + public static final String NEWLINE = "\n"; //System.lineSeparator(); + public static final String COMPONENTS_DELIM = "\u236e"; //semicolon w/ bar; ";"; + public static final String ELEMENT_DELIM = "\u236a"; //comma w/ bar; ","; + public static final String ELEMENT_DELIM2 = ","; + public static final String DATA_FIELD_DELIM = "\u007c"; //"|"; + public static final String KEY_VALUE_DELIM = "\u003d"; //"="; + public static final String LEVELIN = "\u23a8"; //variant of left curly bracket; "\u007b"; //"{"; + public static final String LEVELOUT = "\u23ac"; //variant of right curly bracket; "\u007d"; //"}"; + public static final String EMPTY = "null"; + public static final String DASH = "-"; + public static final String REF = "ref"; + public static final String LIST_ELEMENT_DELIM = "\t"; + + public static final String CDATA_BEGIN = "<![CDATA["; + public static final String CDATA_END = " ]]>"; + + public static final String PROG_BEGIN = " PROG" + LEVELIN; + public static final String PROG_END = LEVELOUT; + public static final String VARS_BEGIN = "VARS: "; + public static final String VARS_END = ""; + public static final String PBS_BEGIN = " PBS" + LEVELIN; + public static final String PBS_END = LEVELOUT; + public static final String INST_BEGIN = " INST: "; + public static final String INST_END = ""; + public static final String EC_BEGIN = " EC: "; + public static final String EC_END = ""; + public static final String PB_BEGIN = " PB" + LEVELIN; + public static final String PB_END = LEVELOUT; + public static final String PB_WHILE = " WHILE" + LEVELIN; + public static final String PB_FOR = " FOR" + LEVELIN; + public static final String PB_PARFOR = " PARFOR" + LEVELIN; + public static final String PB_IF = " IF" + LEVELIN; + public static final String PB_FC = " FC" + LEVELIN; + public static final String PB_EFC = " EFC" + LEVELIN; + + public static final String CONF_STATS = "stats"; + + // Used for parfor + public static final String PARFORBODY_BEGIN = CDATA_BEGIN + "PARFORBODY" + LEVELIN; + public static final String PARFORBODY_END = LEVELOUT + CDATA_END; + + // Used for paramserv builtin function + public static final String PSBODY_BEGIN = CDATA_BEGIN + "PSBODY" + LEVELIN; + public static final String PSBODY_END = LEVELOUT + CDATA_END; + + //exception msgs + public static final String NOT_SUPPORTED_EXTERNALFUNCTION_PB = "Not supported: ExternalFunctionProgramBlock contains MR instructions. " + + "(ExternalFunctionPRogramBlockCP can be used)"; + public static final String NOT_SUPPORTED_MR_INSTRUCTION = "Not supported: Instructions of type other than CP instructions"; + public static final String NOT_SUPPORTED_MR_PARFOR = "Not supported: Nested ParFOR REMOTE_MR due to possible deadlocks." + + "(LOCAL can be used for innner ParFOR)"; + public static final String NOT_SUPPORTED_PB = "Not supported: type of program block"; + + //////////////////////////////// + // CREATION of DEEP COPIES + //////////////////////////////// + + /** + * Creates a deep copy of the given execution context. + * For rt_platform=Hadoop, execution context has a symbol table. + * + * @param ec execution context + * @return execution context + * @throws CloneNotSupportedException if CloneNotSupportedException occurs + */ + public static ExecutionContext createDeepCopyExecutionContext(ExecutionContext ec) + throws CloneNotSupportedException + { + ExecutionContext cpec = ExecutionContextFactory.createContext(false, ec.getProgram()); + cpec.setVariables((LocalVariableMap) ec.getVariables().clone()); + + //handle result variables with in-place update flag + //(each worker requires its own copy of the empty matrix object) + for( String var : cpec.getVariables().keySet() ) { + Data dat = cpec.getVariables().get(var); + if( dat instanceof MatrixObject && ((MatrixObject)dat).getUpdateType().isInPlace() ) { + MatrixObject mo = (MatrixObject)dat; + MatrixObject moNew = new MatrixObject(mo); + if( mo.getNnz() != 0 ){ + // If output matrix is not empty (NNZ != 0), then local copy is created so that + // update in place operation can be applied. + MatrixBlock mbVar = mo.acquireRead(); + moNew.acquireModify (new MatrixBlock(mbVar)); + mo.release(); + } else { + //create empty matrix block w/ dense representation (preferred for update in-place) + //Creating a dense matrix block is valid because empty block not allocated and transfer + // to sparse representation happens in left indexing in place operation. + moNew.acquireModify(new MatrixBlock((int)mo.getNumRows(), (int)mo.getNumColumns(), false)); + } + moNew.release(); + cpec.setVariable(var, moNew); + } + } + + return cpec; + } + + /** + * This recursively creates a deep copy of program blocks and transparently replaces filenames according to the + * specified parallel worker in order to avoid conflicts between parworkers. This happens recursively in order + * to support arbitrary control-flow constructs within a parfor. + * + * @param childBlocks child program blocks + * @param pid ? + * @param IDPrefix ? + * @param fnStack ? + * @param fnCreated ? + * @param plain if true, full deep copy without id replacement + * @param forceDeepCopy if true, force deep copy + * @return list of program blocks + */ + public static ArrayList<ProgramBlock> rcreateDeepCopyProgramBlocks(ArrayList<ProgramBlock> childBlocks, long pid, int IDPrefix, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) + { + ArrayList<ProgramBlock> tmp = new ArrayList<>(); + + for( ProgramBlock pb : childBlocks ) + { + Program prog = pb.getProgram(); + ProgramBlock tmpPB = null; + + if( pb instanceof WhileProgramBlock ) { + tmpPB = createDeepCopyWhileProgramBlock((WhileProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy); + } + else if( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock) ) { + tmpPB = createDeepCopyForProgramBlock((ForProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy ); + } + else if( pb instanceof ParForProgramBlock ) { + ParForProgramBlock pfpb = (ParForProgramBlock) pb; + if( ParForProgramBlock.ALLOW_NESTED_PARALLELISM ) + tmpPB = createDeepCopyParForProgramBlock(pfpb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy); + else + tmpPB = createDeepCopyForProgramBlock((ForProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy); + } + else if( pb instanceof IfProgramBlock ) { + tmpPB = createDeepCopyIfProgramBlock((IfProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy); + } + else { //last-level program block + tmpPB = new ProgramBlock(prog); // general case use for most PBs + + //for recompile in the master node JVM + tmpPB.setStatementBlock(createStatementBlockCopy(pb.getStatementBlock(), pid, plain, forceDeepCopy)); + //tmpPB.setStatementBlock(pb.getStatementBlock()); + tmpPB.setThreadID(pid); + } + + //copy instructions + tmpPB.setInstructions( createDeepCopyInstructionSet(pb.getInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); + + //copy symbol table + //tmpPB.setVariables( pb.getVariables() ); //implicit cloning + + tmp.add(tmpPB); + } + + return tmp; + } + + public static WhileProgramBlock createDeepCopyWhileProgramBlock(WhileProgramBlock wpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) { + ArrayList<Instruction> predinst = createDeepCopyInstructionSet(wpb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true); + WhileProgramBlock tmpPB = new WhileProgramBlock(prog, predinst); + tmpPB.setStatementBlock( createWhileStatementBlockCopy((WhileStatementBlock) wpb.getStatementBlock(), pid, plain, forceDeepCopy) ); + tmpPB.setThreadID(pid); + tmpPB.setExitInstructions2( createDeepCopyInstructionSet(wpb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true)); + tmpPB.setChildBlocks(rcreateDeepCopyProgramBlocks(wpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy)); + return tmpPB; + } + + public static IfProgramBlock createDeepCopyIfProgramBlock(IfProgramBlock ipb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) { + ArrayList<Instruction> predinst = createDeepCopyInstructionSet(ipb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true); + IfProgramBlock tmpPB = new IfProgramBlock(prog, predinst); + tmpPB.setStatementBlock( createIfStatementBlockCopy((IfStatementBlock)ipb.getStatementBlock(), pid, plain, forceDeepCopy ) ); + tmpPB.setThreadID(pid); + tmpPB.setExitInstructions2( createDeepCopyInstructionSet(ipb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true)); + tmpPB.setChildBlocksIfBody(rcreateDeepCopyProgramBlocks(ipb.getChildBlocksIfBody(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy)); + tmpPB.setChildBlocksElseBody(rcreateDeepCopyProgramBlocks(ipb.getChildBlocksElseBody(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy)); + return tmpPB; + } + + public static ForProgramBlock createDeepCopyForProgramBlock(ForProgramBlock fpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) { + ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar()); + tmpPB.setStatementBlock( createForStatementBlockCopy((ForStatementBlock)fpb.getStatementBlock(), pid, plain, forceDeepCopy)); + tmpPB.setThreadID(pid); + tmpPB.setFromInstructions( createDeepCopyInstructionSet(fpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); + tmpPB.setToInstructions( createDeepCopyInstructionSet(fpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); + tmpPB.setIncrementInstructions( createDeepCopyInstructionSet(fpb.getIncrementInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); + tmpPB.setExitInstructions( createDeepCopyInstructionSet(fpb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); + tmpPB.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy) ); + return tmpPB; + } + + public static ForProgramBlock createShallowCopyForProgramBlock(ForProgramBlock fpb, Program prog ) { + ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar()); + tmpPB.setFromInstructions( fpb.getFromInstructions() ); + tmpPB.setToInstructions( fpb.getToInstructions() ); + tmpPB.setIncrementInstructions( fpb.getIncrementInstructions() ); + tmpPB.setExitInstructions( fpb.getExitInstructions() ); + tmpPB.setChildBlocks( fpb.getChildBlocks() ); + return tmpPB; + } + + public static ParForProgramBlock createDeepCopyParForProgramBlock(ParForProgramBlock pfpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) { + ParForProgramBlock tmpPB = null; + + if( IDPrefix == -1 ) //still on master node + tmpPB = new ParForProgramBlock(prog,pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables()); + else //child of remote ParWorker at any level + tmpPB = new ParForProgramBlock(IDPrefix, prog, pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables()); + + tmpPB.setStatementBlock( createForStatementBlockCopy( (ForStatementBlock) pfpb.getStatementBlock(), pid, plain, forceDeepCopy) ); + tmpPB.setThreadID(pid); + + tmpPB.disableOptimization(); //already done in top-level parfor + tmpPB.disableMonitorReport(); //already done in top-level parfor + + tmpPB.setFromInstructions( createDeepCopyInstructionSet(pfpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); + tmpPB.setToInstructions( createDeepCopyInstructionSet(pfpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); + tmpPB.setIncrementInstructions( createDeepCopyInstructionSet(pfpb.getIncrementInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); + tmpPB.setExitInstructions( createDeepCopyInstructionSet(pfpb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); + + //NOTE: Normally, no recursive copy because (1) copied on each execution in this PB anyway + //and (2) leave placeholders as they are. However, if plain, an explicit deep copy is requested. + if( plain || forceDeepCopy ) + tmpPB.setChildBlocks( rcreateDeepCopyProgramBlocks(pfpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy) ); + else + tmpPB.setChildBlocks( pfpb.getChildBlocks() ); + + return tmpPB; + } + + /** + * This creates a deep copy of a function program block. The central reference to singletons of function program blocks + * poses the need for explicit copies in order to prevent conflicting writes of temporary variables (see ExternalFunctionProgramBlock. + * + * @param namespace function namespace + * @param oldName ? + * @param pid ? + * @param IDPrefix ? + * @param prog runtime program + * @param fnStack ? + * @param fnCreated ? + * @param plain ? + */ + public static void createDeepCopyFunctionProgramBlock(String namespace, String oldName, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain) + { + //fpb guaranteed to be non-null (checked inside getFunctionProgramBlock) + FunctionProgramBlock fpb = prog.getFunctionProgramBlock(namespace, oldName); + String fnameNew = (plain)? oldName :(oldName+Lop.CP_CHILD_THREAD+pid); + String fnameNewKey = DMLProgram.constructFunctionKey(namespace,fnameNew); + + if( prog.getFunctionProgramBlocks().containsKey(fnameNewKey) ) + return; //prevent redundant deep copy if already existent + + //create deep copy + FunctionProgramBlock copy = null; + ArrayList<DataIdentifier> tmp1 = new ArrayList<>(); + ArrayList<DataIdentifier> tmp2 = new ArrayList<>(); + if( fpb.getInputParams()!= null ) + tmp1.addAll(fpb.getInputParams()); + if( fpb.getOutputParams()!= null ) + tmp2.addAll(fpb.getOutputParams()); + + if( fpb instanceof ExternalFunctionProgramBlockCP ) { + ExternalFunctionProgramBlockCP efpb = (ExternalFunctionProgramBlockCP) fpb; + HashMap<String,String> tmp3 = efpb.getOtherParams(); + if( IDPrefix!=-1 ) + copy = new ExternalFunctionProgramBlockCP(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),Lop.CP_CHILD_THREAD+IDPrefix,Lop.CP_CHILD_THREAD+pid)); + else + copy = new ExternalFunctionProgramBlockCP(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),Lop.CP_ROOT_THREAD_ID,Lop.CP_CHILD_THREAD+pid)); + } + else if( fpb instanceof ExternalFunctionProgramBlock ) { + ExternalFunctionProgramBlock efpb = (ExternalFunctionProgramBlock) fpb; + HashMap<String,String> tmp3 = efpb.getOtherParams(); + if( IDPrefix!=-1 ) + copy = new ExternalFunctionProgramBlock(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),Lop.CP_CHILD_THREAD+IDPrefix, Lop.CP_CHILD_THREAD+pid)); + else + copy = new ExternalFunctionProgramBlock(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),Lop.CP_ROOT_THREAD_ID, Lop.CP_CHILD_THREAD+pid)); + } + else { + if( !fnStack.contains(fnameNewKey) ) { + fnStack.add(fnameNewKey); + copy = new FunctionProgramBlock(prog, tmp1, tmp2); + copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, fpb.isRecompileOnce()) ); + copy.setRecompileOnce( fpb.isRecompileOnce() ); + copy.setThreadID(pid); + fnStack.remove(fnameNewKey); + } + else //stop deep copy for recursive function calls + copy = fpb; + } + + //copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning + //note: instructions not used by function program block + + //put + prog.addFunctionProgramBlock(namespace, fnameNew, copy); + fnCreated.add(DMLProgram.constructFunctionKey(namespace, fnameNew)); + } + + public static FunctionProgramBlock createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, HashSet<String> fnStack, HashSet<String> fnCreated) + { + if( fpb == null ) + throw new DMLRuntimeException("Unable to create a deep copy of a non-existing FunctionProgramBlock."); + + //create deep copy + FunctionProgramBlock copy = null; + ArrayList<DataIdentifier> tmp1 = new ArrayList<>(); + ArrayList<DataIdentifier> tmp2 = new ArrayList<>(); + if( fpb.getInputParams()!= null ) + tmp1.addAll(fpb.getInputParams()); + if( fpb.getOutputParams()!= null ) + tmp2.addAll(fpb.getOutputParams()); + + copy = new FunctionProgramBlock(fpb.getProgram(), tmp1, tmp2); + copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), 0, -1, fnStack, fnCreated, true, fpb.isRecompileOnce()) ); + copy.setStatementBlock( fpb.getStatementBlock() ); + copy.setRecompileOnce(fpb.isRecompileOnce()); + //copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning + //note: instructions not used by function program block + + return copy; + } + + + /** + * Creates a deep copy of an array of instructions and replaces the placeholders of parworker + * IDs with the concrete IDs of this parfor instance. This is a helper method uses for generating + * deep copies of program blocks. + * + * @param instSet list of instructions + * @param pid ? + * @param IDPrefix ? + * @param prog runtime program + * @param fnStack ? + * @param fnCreated ? + * @param plain ? + * @param cpFunctions ? + * @return list of instructions + */ + public static ArrayList<Instruction> createDeepCopyInstructionSet(ArrayList<Instruction> instSet, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean cpFunctions) { + ArrayList<Instruction> tmp = new ArrayList<>(); + for( Instruction inst : instSet ) { + if( inst instanceof FunctionCallCPInstruction && cpFunctions ) { + FunctionCallCPInstruction finst = (FunctionCallCPInstruction) inst; + createDeepCopyFunctionProgramBlock( finst.getNamespace(), + finst.getFunctionName(), pid, IDPrefix, prog, fnStack, fnCreated, plain ); + } + tmp.add( cloneInstruction( inst, pid, plain, cpFunctions ) ); + } + return tmp; + } + + public static Instruction cloneInstruction( Instruction oInst, long pid, boolean plain, boolean cpFunctions ) + { + Instruction inst = null; + String tmpString = oInst.toString(); + + try + { + if( oInst instanceof CPInstruction || oInst instanceof SPInstruction || oInst instanceof MRInstruction + || oInst instanceof GPUInstruction ) { + if( oInst instanceof FunctionCallCPInstruction && cpFunctions ) { + FunctionCallCPInstruction tmp = (FunctionCallCPInstruction) oInst; + if( !plain ) { + //safe replacement because target variables might include the function name + //note: this is no update-in-place in order to keep the original function name as basis + tmpString = tmp.updateInstStringFunctionName(tmp.getFunctionName(), tmp.getFunctionName() + Lop.CP_CHILD_THREAD+pid); + } + //otherwise: preserve function name + } + inst = InstructionParser.parseSingleInstruction(tmpString); + } + else if( oInst instanceof MRJobInstruction ) { + //clone via copy constructor + inst = new MRJobInstruction( (MRJobInstruction)oInst ); + } + else + throw new DMLRuntimeException("Failed to clone instruction: "+oInst); + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + + //save replacement of thread id references in instructions + inst = saveReplaceThreadID( inst, Lop.CP_ROOT_THREAD_ID, Lop.CP_CHILD_THREAD+pid); + + return inst; + } + + public static StatementBlock createStatementBlockCopy( StatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) + { + StatementBlock ret = null; + + try + { + if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) + && sb != null //forced deep copy for function recompilation + && (Recompiler.requiresRecompilation( sb.getHops() ) || forceDeepCopy) ) + { + //create new statement (shallow copy livein/liveout for recompile, line numbers for explain) + ret = new StatementBlock(); + ret.setDMLProg(sb.getDMLProg()); + ret.setParseInfo(sb); + ret.setLiveIn( sb.liveIn() ); + ret.setLiveOut( sb.liveOut() ); + ret.setUpdatedVariables( sb.variablesUpdated() ); + ret.setReadVariables( sb.variablesRead() ); + + //deep copy hops dag for concurrent recompile + ArrayList<Hop> hops = Recompiler.deepCopyHopsDag( sb.getHops() ); + if( !plain ) + Recompiler.updateFunctionNames( hops, pid ); + ret.setHops( hops ); + ret.updateRecompilationFlag(); + } + else { + ret = sb; + } + } + catch( Exception ex ) { + throw new DMLRuntimeException( ex ); + } + + return ret; + } + + public static IfStatementBlock createIfStatementBlockCopy( IfStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) + { + IfStatementBlock ret = null; + + try + { + if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) + && sb != null //forced deep copy for function recompile + && (Recompiler.requiresRecompilation( sb.getPredicateHops() ) || forceDeepCopy) ) + { + //create new statement (shallow copy livein/liveout for recompile, line numbers for explain) + ret = new IfStatementBlock(); + ret.setDMLProg(sb.getDMLProg()); + ret.setParseInfo(sb); + ret.setLiveIn( sb.liveIn() ); + ret.setLiveOut( sb.liveOut() ); + ret.setUpdatedVariables( sb.variablesUpdated() ); + ret.setReadVariables( sb.variablesRead() ); + + //shallow copy child statements + ret.setStatements( sb.getStatements() ); + + //deep copy predicate hops dag for concurrent recompile + Hop hops = Recompiler.deepCopyHopsDag( sb.getPredicateHops() ); + ret.setPredicateHops( hops ); + ret.updatePredicateRecompilationFlag(); + } + else { + ret = sb; + } + } + catch( Exception ex ) { + throw new DMLRuntimeException( ex ); + } + + return ret; + } + + public static WhileStatementBlock createWhileStatementBlockCopy( WhileStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) + { + WhileStatementBlock ret = null; + + try + { + if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) + && sb != null //forced deep copy for function recompile + && (Recompiler.requiresRecompilation( sb.getPredicateHops() ) || forceDeepCopy) ) + { + //create new statement (shallow copy livein/liveout for recompile, line numbers for explain) + ret = new WhileStatementBlock(); + ret.setDMLProg(sb.getDMLProg()); + ret.setParseInfo(sb); + ret.setLiveIn( sb.liveIn() ); + ret.setLiveOut( sb.liveOut() ); + ret.setUpdatedVariables( sb.variablesUpdated() ); + ret.setReadVariables( sb.variablesRead() ); + ret.setUpdateInPlaceVars( sb.getUpdateInPlaceVars() ); + + //shallow copy child statements + ret.setStatements( sb.getStatements() ); + + //deep copy predicate hops dag for concurrent recompile + Hop hops = Recompiler.deepCopyHopsDag( sb.getPredicateHops() ); + ret.setPredicateHops( hops ); + ret.updatePredicateRecompilationFlag(); + } + else { + ret = sb; + } + } + catch( Exception ex ) { + throw new DMLRuntimeException( ex ); + } + + return ret; + } + + public static ForStatementBlock createForStatementBlockCopy( ForStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) + { + ForStatementBlock ret = null; + + try + { + if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) + && sb != null + && ( Recompiler.requiresRecompilation(sb.getFromHops()) || + Recompiler.requiresRecompilation(sb.getToHops()) || + Recompiler.requiresRecompilation(sb.getIncrementHops()) || + forceDeepCopy ) ) + { + ret = (sb instanceof ParForStatementBlock) ? new ParForStatementBlock() : new ForStatementBlock(); + + //create new statement (shallow copy livein/liveout for recompile, line numbers for explain) + ret.setDMLProg(sb.getDMLProg()); + ret.setParseInfo(sb); + ret.setLiveIn( sb.liveIn() ); + ret.setLiveOut( sb.liveOut() ); + ret.setUpdatedVariables( sb.variablesUpdated() ); + ret.setReadVariables( sb.variablesRead() ); + ret.setUpdateInPlaceVars( sb.getUpdateInPlaceVars() ); + + //shallow copy child statements + ret.setStatements( sb.getStatements() ); + + //deep copy predicate hops dag for concurrent recompile + if( sb.requiresFromRecompilation() ){ + Hop hops = Recompiler.deepCopyHopsDag( sb.getFromHops() ); + ret.setFromHops( hops ); + } + if( sb.requiresToRecompilation() ){ + Hop hops = Recompiler.deepCopyHopsDag( sb.getToHops() ); + ret.setToHops( hops ); + } + if( sb.requiresIncrementRecompilation() ){ + Hop hops = Recompiler.deepCopyHopsDag( sb.getIncrementHops() ); + ret.setIncrementHops( hops ); + } + ret.updatePredicateRecompilationFlags(); + } + else { + ret = sb; + } + } + catch( Exception ex ) { + throw new DMLRuntimeException( ex ); + } + + return ret; + } + + + //////////////////////////////// + // SERIALIZATION + //////////////////////////////// + + public static String serializeSparkPSBody(SparkPSBody body, HashMap<String, byte[]> clsMap) { + + ExecutionContext ec = body.getEc(); + StringBuilder builder = new StringBuilder(); + builder.append(PSBODY_BEGIN); + builder.append(NEWLINE); + + //handle DMLScript UUID (propagate original uuid for writing to scratch space) + builder.append(DMLScript.getUUID()); + builder.append(COMPONENTS_DELIM); + builder.append(NEWLINE); + + //handle DML config + builder.append(ConfigurationManager.getDMLConfig().serializeDMLConfig()); + builder.append(COMPONENTS_DELIM); + builder.append(NEWLINE); + + //handle additional configurations + builder.append(CONF_STATS + "=" + DMLScript.STATISTICS); + builder.append(COMPONENTS_DELIM); + builder.append(NEWLINE); + + //handle program + builder.append(PROG_BEGIN); + builder.append(NEWLINE); + builder.append(rSerializeFunctionProgramBlocks(ec.getProgram().getFunctionProgramBlocks(), + new HashSet<>(ec.getProgram().getFunctionProgramBlocks().keySet()), clsMap)); + builder.append(PROG_END); + builder.append(NEWLINE); + builder.append(COMPONENTS_DELIM); + builder.append(NEWLINE); + + //handle execution context + builder.append(EC_BEGIN); + builder.append(serializeExecutionContext(ec)); + builder.append(EC_END); + builder.append(NEWLINE); + builder.append(COMPONENTS_DELIM); + builder.append(NEWLINE); + + //handle program blocks + builder.append(PBS_BEGIN); + builder.append(NEWLINE); + builder.append(rSerializeProgramBlocks(ec.getProgram().getProgramBlocks(), clsMap)); + builder.append(PBS_END); + builder.append(NEWLINE); + + builder.append(PSBODY_END); + + return builder.toString(); + } + + public static String serializeParForBody( ParForBody body ) { + return serializeParForBody(body, new HashMap<String, byte[]>()); + } + + public static String serializeParForBody( ParForBody body, HashMap<String,byte[]> clsMap ) + { + ArrayList<ProgramBlock> pbs = body.getChildBlocks(); + ArrayList<ResultVar> rVnames = body.getResultVariables(); + ExecutionContext ec = body.getEc(); + + if( pbs.isEmpty() ) + return PARFORBODY_BEGIN + PARFORBODY_END; + + Program prog = pbs.get( 0 ).getProgram(); + + StringBuilder sb = new StringBuilder(); + sb.append( PARFORBODY_BEGIN ); + sb.append( NEWLINE ); + + //handle DMLScript UUID (propagate original uuid for writing to scratch space) + sb.append( DMLScript.getUUID() ); + sb.append( COMPONENTS_DELIM ); + sb.append( NEWLINE ); + + //handle DML config + sb.append( ConfigurationManager.getDMLConfig().serializeDMLConfig() ); + sb.append( COMPONENTS_DELIM ); + sb.append( NEWLINE ); + + //handle additional configurations + sb.append( CONF_STATS + "=" + DMLScript.STATISTICS ); + sb.append( COMPONENTS_DELIM ); + sb.append( NEWLINE ); + + //handle program + sb.append(PROG_BEGIN); + sb.append( NEWLINE ); + sb.append( serializeProgram(prog, pbs, clsMap) ); + sb.append(PROG_END); + sb.append( NEWLINE ); + sb.append( COMPONENTS_DELIM ); + sb.append( NEWLINE ); + + //handle result variable names + sb.append( serializeResultVariables(rVnames) ); + sb.append( COMPONENTS_DELIM ); + + //handle execution context + //note: this includes also the symbol table (serialize only the top-level variable map, + // (symbol tables for nested/child blocks are created at parse time, on the remote side) + sb.append(EC_BEGIN); + sb.append( serializeExecutionContext(ec) ); + sb.append(EC_END); + sb.append( NEWLINE ); + sb.append( COMPONENTS_DELIM ); + sb.append( NEWLINE ); + + //handle program blocks + sb.append(PBS_BEGIN); + sb.append( NEWLINE ); + sb.append( rSerializeProgramBlocks(pbs, clsMap) ); + sb.append(PBS_END); + sb.append( NEWLINE ); + + sb.append( PARFORBODY_END ); + + return sb.toString(); + } + + private static String serializeProgram( Program prog, ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap ) { + //note program contains variables, programblocks and function program blocks + //but in order to avoid redundancy, we only serialize function program blocks + HashMap<String, FunctionProgramBlock> fpb = prog.getFunctionProgramBlocks(); + HashSet<String> cand = new HashSet<>(); + rFindSerializationCandidates(pbs, cand); + return rSerializeFunctionProgramBlocks( fpb, cand, clsMap ); + } + + private static void rFindSerializationCandidates( ArrayList<ProgramBlock> pbs, HashSet<String> cand ) + { + for( ProgramBlock pb : pbs ) + { + if( pb instanceof WhileProgramBlock ) { + WhileProgramBlock wpb = (WhileProgramBlock) pb; + rFindSerializationCandidates(wpb.getChildBlocks(), cand ); + } + else if ( pb instanceof ForProgramBlock || pb instanceof ParForProgramBlock ) { + ForProgramBlock fpb = (ForProgramBlock) pb; + rFindSerializationCandidates(fpb.getChildBlocks(), cand); + } + else if ( pb instanceof IfProgramBlock ) { + IfProgramBlock ipb = (IfProgramBlock) pb; + rFindSerializationCandidates(ipb.getChildBlocksIfBody(), cand); + if( ipb.getChildBlocksElseBody() != null ) + rFindSerializationCandidates(ipb.getChildBlocksElseBody(), cand); + } + else { //all generic program blocks + for( Instruction inst : pb.getInstructions() ) + if( inst instanceof FunctionCallCPInstruction ) { + FunctionCallCPInstruction fci = (FunctionCallCPInstruction) inst; + String fkey = DMLProgram.constructFunctionKey(fci.getNamespace(), fci.getFunctionName()); + if( !cand.contains(fkey) ) { //memoization for multiple calls, recursion + cand.add( fkey ); //add to candidates + //investigate chains of function calls + FunctionProgramBlock fpb = pb.getProgram().getFunctionProgramBlock(fci.getNamespace(), fci.getFunctionName()); + rFindSerializationCandidates(fpb.getChildBlocks(), cand); + } + } + } + } + } + + private static String serializeVariables (LocalVariableMap vars) { + StringBuilder sb = new StringBuilder(); + sb.append(VARS_BEGIN); + sb.append( vars.serialize() ); + sb.append(VARS_END); + return sb.toString(); + } + + public static String serializeDataObject(String key, Data dat) + { + // SCHEMA: <name>|<datatype>|<valuetype>|value + // (scalars are serialize by value, matrices by filename) + StringBuilder sb = new StringBuilder(); + //prepare data for serialization + String name = key; + DataType datatype = dat.getDataType(); + ValueType valuetype = dat.getValueType(); + String value = null; + String[] metaData = null; + String[] listData = null; + switch( datatype ) + { + case SCALAR: + ScalarObject so = (ScalarObject) dat; + //name = so.getName(); + value = so.getStringValue(); + break; + case MATRIX: + MatrixObject mo = (MatrixObject) dat; + MetaDataFormat md = (MetaDataFormat) dat.getMetaData(); + MatrixCharacteristics mc = md.getMatrixCharacteristics(); + value = mo.getFileName(); + PartitionFormat partFormat = (mo.getPartitionFormat()!=null) ? new PartitionFormat( + mo.getPartitionFormat(),mo.getPartitionSize()) : PartitionFormat.NONE; + metaData = new String[9]; + metaData[0] = String.valueOf( mc.getRows() ); + metaData[1] = String.valueOf( mc.getCols() ); + metaData[2] = String.valueOf( mc.getRowsPerBlock() ); + metaData[3] = String.valueOf( mc.getColsPerBlock() ); + metaData[4] = String.valueOf( mc.getNonZeros() ); + metaData[5] = InputInfo.inputInfoToString( md.getInputInfo() ); + metaData[6] = OutputInfo.outputInfoToString( md.getOutputInfo() ); + metaData[7] = String.valueOf( partFormat ); + metaData[8] = String.valueOf( mo.getUpdateType() ); + break; + case LIST: + // SCHEMA: <name>|<datatype>|<valuetype>|value|<metadata>|<tab>element1<tab>element2<tab>element3 (this is the list) + // (for the element1) <listName-index>|<datatype>|<valuetype>|value + // (for the element2) <listName-index>|<datatype>|<valuetype>|value + ListObject lo = (ListObject) dat; + value = REF; + metaData = new String[2]; + metaData[0] = String.valueOf(lo.getLength()); + metaData[1] = lo.getNames() == null ? EMPTY : serializeList(lo.getNames(), ELEMENT_DELIM2); + listData = new String[lo.getLength()]; + for (int index = 0; index < lo.getLength(); index++) { + listData[index] = serializeDataObject(name + DASH + index, lo.slice(index)); + } + break; + default: + throw new DMLRuntimeException("Unable to serialize datatype "+datatype); + } + + //serialize data + sb.append(name); + sb.append(DATA_FIELD_DELIM); + sb.append(datatype); + sb.append(DATA_FIELD_DELIM); + sb.append(valuetype); + sb.append(DATA_FIELD_DELIM); + sb.append(value); + if( metaData != null ) + for( int i=0; i<metaData.length; i++ ) { + sb.append(DATA_FIELD_DELIM); + sb.append(metaData[i]); + } + if (listData != null) { + sb.append(DATA_FIELD_DELIM); + for (String ld : listData) { + sb.append(LIST_ELEMENT_DELIM); + sb.append(ld); + } + } + + return sb.toString(); + } + + private static String serializeExecutionContext( ExecutionContext ec ) { + return (ec != null) ? serializeVariables( ec.getVariables() ) : EMPTY; + } + + @SuppressWarnings("all") + private static String serializeInstructions( ArrayList<Instruction> inst, HashMap<String, byte[]> clsMap ) + { + StringBuilder sb = new StringBuilder(); + int count = 0; + for( Instruction linst : inst ) { + //check that only cp instruction are transmitted + if( !( linst instanceof CPInstruction || linst instanceof ExternalFunctionInvocationInstruction ) ) + throw new DMLRuntimeException( NOT_SUPPORTED_MR_INSTRUCTION + " " +linst.getClass().getName()+"\n"+linst ); + + //obtain serialized version of generated classes + if( linst instanceof SpoofCPInstruction ) { + Class<?> cla = ((SpoofCPInstruction) linst).getOperatorClass(); + clsMap.put(cla.getName(), CodegenUtils.getClassData(cla.getName())); + } + + if( count > 0 ) + sb.append( ELEMENT_DELIM ); + + sb.append( checkAndReplaceLiterals( linst.toString() ) ); + count++; + } + + return sb.toString(); + } + + /** + * Replacement of internal delimiters occurring in literals of instructions + * in order to ensure robustness of serialization and parsing. + * (e.g. print( "a,b" ) would break the parsing of instruction that internally + * are separated with a "," ) + * + * @param instStr instruction string + * @return instruction string with replacements + */ + private static String checkAndReplaceLiterals( String instStr ) + { + String tmp = instStr; + + //1) check own delimiters (very unlikely due to special characters) + if( tmp.contains(COMPONENTS_DELIM) ) { + tmp = tmp.replaceAll(COMPONENTS_DELIM, "."); + LOG.warn("Replaced special literal character sequence "+COMPONENTS_DELIM+" with '.'"); + } + + if( tmp.contains(ELEMENT_DELIM) ) { + tmp = tmp.replaceAll(ELEMENT_DELIM, "."); + LOG.warn("Replaced special literal character sequence "+ELEMENT_DELIM+" with '.'"); + } + + if( tmp.contains( LEVELIN ) ){ + tmp = tmp.replaceAll(LEVELIN, "("); // '\\' required if LEVELIN='{' because regex + LOG.warn("Replaced special literal character sequence "+LEVELIN+" with '('"); + } + + if( tmp.contains(LEVELOUT) ){ + tmp = tmp.replaceAll(LEVELOUT, ")"); + LOG.warn("Replaced special literal character sequence "+LEVELOUT+" with ')'"); + } + + //NOTE: DATA_FIELD_DELIM and KEY_VALUE_DELIM not required + //because those literals cannot occur in critical places. + + //2) check end tag of CDATA + if( tmp.contains(CDATA_END) ){ + tmp = tmp.replaceAll(CDATA_END, "."); //prevent XML parsing issues in job.xml + LOG.warn("Replaced special literal character sequence "+ CDATA_END +" with '.'"); + } + + return tmp; + } + + private static String serializeStringHashMap(HashMap<String,String> vars) { + return serializeList(vars.entrySet().stream().map(e -> + e.getKey()+KEY_VALUE_DELIM+e.getValue()).collect(Collectors.toList())); + } + + public static String serializeResultVariables( List<ResultVar> vars) { + return serializeList(vars.stream().map(v -> v._isAccum ? + v._name+"+" : v._name).collect(Collectors.toList())); + } + + public static String serializeList(List<String> elements) { + return serializeList(elements, ELEMENT_DELIM); + } + + public static String serializeList(List<String> elements, String delim) { + return StringUtils.join(elements, delim); + } + + private static String serializeDataIdentifiers(List<DataIdentifier> vars) { + return serializeList(vars.stream().map(v -> + serializeDataIdentifier(v)).collect(Collectors.toList())); + } + + private static String serializeDataIdentifier( DataIdentifier dat ) { + // SCHEMA: <name>|<datatype>|<valuetype> + StringBuilder sb = new StringBuilder(); + sb.append(dat.getName()); + sb.append(DATA_FIELD_DELIM); + sb.append(dat.getDataType()); + sb.append(DATA_FIELD_DELIM); + sb.append(dat.getValueType()); + return sb.toString(); + } + + private static String rSerializeFunctionProgramBlocks(HashMap<String,FunctionProgramBlock> pbs, HashSet<String> cand, HashMap<String, byte[]> clsMap) { + StringBuilder sb = new StringBuilder(); + int count = 0; + for( Entry<String,FunctionProgramBlock> pb : pbs.entrySet() ) { + if( !cand.contains(pb.getKey()) ) //skip function not included in the parfor body + continue; + if( count>0 ) { + sb.append( ELEMENT_DELIM ); + sb.append( NEWLINE ); + } + sb.append( pb.getKey() ); + sb.append( KEY_VALUE_DELIM ); + sb.append( rSerializeProgramBlock(pb.getValue(), clsMap) ); + count++; + } + sb.append(NEWLINE); + return sb.toString(); + } + + private static String rSerializeProgramBlocks(ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap) { + StringBuilder sb = new StringBuilder(); + int count = 0; + for( ProgramBlock pb : pbs ) { + if( count>0 ) { + sb.append( ELEMENT_DELIM ); + sb.append(NEWLINE); + } + sb.append( rSerializeProgramBlock(pb, clsMap) ); + count++; + } + return sb.toString(); + } + + private static String rSerializeProgramBlock( ProgramBlock pb, HashMap<String, byte[]> clsMap ) { + StringBuilder sb = new StringBuilder(); + + //handle header + if( pb instanceof WhileProgramBlock ) + sb.append(PB_WHILE); + else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock) ) + sb.append(PB_FOR); + else if ( pb instanceof ParForProgramBlock ) + sb.append(PB_PARFOR); + else if ( pb instanceof IfProgramBlock ) + sb.append(PB_IF); + else if ( pb instanceof FunctionProgramBlock && !(pb instanceof ExternalFunctionProgramBlock) ) + sb.append(PB_FC); + else if ( pb instanceof ExternalFunctionProgramBlock ) + sb.append(PB_EFC); + else //all generic program blocks + sb.append(PB_BEGIN); + + //handle body + if( pb instanceof WhileProgramBlock ) + { + WhileProgramBlock wpb = (WhileProgramBlock) pb; + sb.append(INST_BEGIN); + sb.append( serializeInstructions( wpb.getPredicate(), clsMap ) ); + sb.append(INST_END); + sb.append( COMPONENTS_DELIM ); + sb.append(INST_BEGIN); + sb.append( serializeInstructions( wpb.getExitInstructions(), clsMap ) ); + sb.append(INST_END); + sb.append( COMPONENTS_DELIM ); + sb.append(PBS_BEGIN); + sb.append( rSerializeProgramBlocks( wpb.getChildBlocks(), clsMap) ); + sb.append(PBS_END); + } + else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock ) ) + { + ForProgramBlock fpb = (ForProgramBlock) pb; + sb.append( fpb.getIterVar() ); + sb.append( COMPONENTS_DELIM ); + sb.append(INST_BEGIN); + sb.append( serializeInstructions( fpb.getFromInstructions(), clsMap ) ); + sb.append(INST_END); + sb.append( COMPONENTS_DELIM ); + sb.append(INST_BEGIN); + sb.append( serializeInstructions(fpb.getToInstructions(), clsMap) ); + sb.append(INST_END); + sb.append( COMPONENTS_DELIM ); + sb.append(INST_BEGIN); + sb.append( serializeInstructions(fpb.getIncrementInstructions(), clsMap) ); + sb.append(INST_END); + sb.append( COMPONENTS_DELIM ); + sb.append(INST_BEGIN); + sb.append( serializeInstructions(fpb.getExitInstructions(), clsMap) ); + sb.append(INST_END); + sb.append( COMPONENTS_DELIM ); + sb.append(PBS_BEGIN); + sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) ); + sb.append(PBS_END); + } + else if ( pb instanceof ParForProgramBlock ) + { + ParForProgramBlock pfpb = (ParForProgramBlock) pb; + + //check for nested remote ParFOR + if( PExecMode.valueOf( pfpb.getParForParams().get( ParForStatementBlock.EXEC_MODE )) == PExecMode.REMOTE_MR ) + throw new DMLRuntimeException( NOT_SUPPORTED_MR_PARFOR ); + + sb.append( pfpb.getIterVar() ); + sb.append( COMPONENTS_DELIM ); + sb.append( serializeResultVariables( pfpb.getResultVariables()) ); + sb.append( COMPONENTS_DELIM ); + sb.append( serializeStringHashMap( pfpb.getParForParams()) ); //parameters of nested parfor + sb.append( COMPONENTS_DELIM ); + sb.append(INST_BEGIN); + sb.append( serializeInstructions(pfpb.getFromInstructions(), clsMap) ); + sb.append(INST_END); + sb.append( COMPONENTS_DELIM ); + sb.append(INST_BEGIN); + sb.append( serializeInstructions(pfpb.getToInstructions(), clsMap) ); + sb.append(INST_END); + sb.append( COMPONENTS_DELIM ); + sb.append(INST_BEGIN); + sb.append( serializeInstructions(pfpb.getIncrementInstructions(), clsMap) ); + sb.append(INST_END); + sb.append( COMPONENTS_DELIM ); + sb.append(INST_BEGIN); + sb.append( serializeInstructions(pfpb.getExitInstructions(), clsMap) ); + sb.append(INST_END); + sb.append( COMPONENTS_DELIM ); + sb.append(PBS_BEGIN); + sb.append( rSerializeProgramBlocks( pfpb.getChildBlocks(), clsMap ) ); + sb.append(PBS_END); + } + else if ( pb instanceof IfProgramBlock ) + { + IfProgramBlock ipb = (IfProgramBlock) pb; + sb.append(INST_BEGIN); + sb.append( serializeInstructions(ipb.getPredicate(), clsMap) ); + sb.append(INST_END); + sb.append( COMPONENTS_DELIM ); + sb.append(INST_BEGIN); + sb.append( serializeInstructions(ipb.getExitInstructions(), clsMap) ); + sb.append(INST_END); + sb.append( COMPONENTS_DELIM ); + sb.append(PBS_BEGIN); + sb.append( rSerializeProgramBlocks(ipb.getChildBlocksIfBody(), clsMap) ); + sb.append(PBS_END); + sb.append( COMPONENTS_DELIM ); + sb.append(PBS_BEGIN); + sb.append( rSerializeProgramBlocks(ipb.getChildBlocksElseBody(), clsMap) ); + sb.append(PBS_END); + } + else if( pb instanceof FunctionProgramBlock && !(pb instanceof ExternalFunctionProgramBlock) ) + { + FunctionProgramBlock fpb = (FunctionProgramBlock) pb; + + sb.append( serializeDataIdentifiers( fpb.getInputParams() ) ); + sb.append( COMPONENTS_DELIM ); + sb.append( serializeDataIdentifiers( fpb.getOutputParams() ) ); + sb.append( COMPONENTS_DELIM ); + sb.append(INST_BEGIN); + sb.append( serializeInstructions(fpb.getInstructions(), clsMap) ); + sb.append(INST_END); + sb.append( COMPONENTS_DELIM ); + sb.append(PBS_BEGIN); + sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) ); + sb.append(PBS_END); + sb.append( COMPONENTS_DELIM ); + } + else if( pb instanceof ExternalFunctionProgramBlock ) + { + if( !(pb instanceof ExternalFunctionProgramBlockCP) ) + { + throw new DMLRuntimeException( NOT_SUPPORTED_EXTERNALFUNCTION_PB ); + } + + ExternalFunctionProgramBlockCP fpb = (ExternalFunctionProgramBlockCP) pb; + + sb.append( serializeDataIdentifiers( fpb.getInputParams() ) ); + sb.append( COMPONENTS_DELIM ); + sb.append( serializeDataIdentifiers( fpb.getOutputParams() ) ); + sb.append( COMPONENTS_DELIM ); + sb.append( serializeStringHashMap( fpb.getOtherParams() ) ); + sb.append( COMPONENTS_DELIM ); + sb.append( fpb.getBaseDir() ); + sb.append( COMPONENTS_DELIM ); + + sb.append(INST_BEGIN); + //create on construction anyway + sb.append(INST_END); + sb.append( COMPONENTS_DELIM ); + sb.append(PBS_BEGIN); + sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) ); + sb.append(PBS_END); + } + else //all generic program blocks + { + sb.append(INST_BEGIN); + sb.append( serializeInstructions(pb.getInstructions(), clsMap) ); + sb.append(INST_END); + } + + + //handle end + sb.append(PB_END); + + return sb.toString(); + } + + + //////////////////////////////// + // PARSING + //////////////////////////////// + + public static SparkPSBody parseSparkPSBody(String in, int id) { + SparkPSBody body = new SparkPSBody(); + + //header elimination + String tmpin = in.replaceAll(NEWLINE, ""); //normalization + tmpin = tmpin.substring(PSBODY_BEGIN.length(), tmpin.length() - PSBODY_END.length()); //remove start/end + HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpin, COMPONENTS_DELIM); + + //handle DMLScript UUID (NOTE: set directly in DMLScript) + //(master UUID is used for all nodes (in order to simply cleanup)) + DMLScript.setUUID(st.nextToken()); + + //handle DML config (NOTE: set directly in ConfigurationManager) + handleDMLConfig(st.nextToken()); + + //handle additional configs + parseAndSetAdditionalConfigurations(st.nextToken()); + + //handle program + Program prog = parseProgram(st.nextToken(), id); + + //handle execution context + ExecutionContext ec = parseExecutionContext(st.nextToken(), prog); + ec.setProgram(prog); + + //handle program blocks + String spbs = st.nextToken(); + ArrayList<ProgramBlock> pbs = rParseProgramBlocks(spbs, prog, id); + prog.getProgramBlocks().addAll(pbs); + + body.setEc(ec); + return body; + } + + public static ParForBody parseParForBody( String in, int id ) { + return parseParForBody(in, id, false); + } + + public static ParForBody parseParForBody( String in, int id, boolean inSpark ) { + ParForBody body = new ParForBody(); + + //header elimination + String tmpin = in.replaceAll(NEWLINE, ""); //normalization + tmpin = tmpin.substring(PARFORBODY_BEGIN.length(),tmpin.length()-PARFORBODY_END.length()); //remove start/end + HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpin, COMPONENTS_DELIM); + + //handle DMLScript UUID (NOTE: set directly in DMLScript) + //(master UUID is used for all nodes (in order to simply cleanup)) + DMLScript.setUUID( st.nextToken() ); + + //handle DML config (NOTE: set directly in ConfigurationManager) + String confStr = st.nextToken(); + JobConf job = ConfigurationManager.getCachedJobConf(); + if( !InfrastructureAnalyzer.isLocalMode(job) ) { + handleDMLConfig(confStr); + //init internal configuration w/ parsed or default config + ParForProgramBlock.initInternalConfigurations( + ConfigurationManager.getDMLConfig()); + } + + //handle additional configs + String aconfs = st.nextToken(); + if( !inSpark ) + parseAndSetAdditionalConfigurations( aconfs ); + + //handle program + String progStr = st.nextToken(); + Program prog = parseProgram( progStr, id ); + + //handle result variable names + String rvarStr = st.nextToken(); + ArrayList<ResultVar> rvars = parseResultVariables(rvarStr); + body.setResultVariables(rvars); + + //handle execution context + String ecStr = st.nextToken(); + ExecutionContext ec = parseExecutionContext( ecStr, prog ); + + //handle program blocks + String spbs = st.nextToken(); + ArrayList<ProgramBlock> pbs = rParseProgramBlocks(spbs, prog, id); + + body.setChildBlocks( pbs ); + body.setEc( ec ); + + return body; + } + + private static void handleDMLConfig(String confStr) { + if(confStr != null && !confStr.trim().isEmpty()) { + DMLConfig dmlconf = DMLConfig.parseDMLConfig(confStr); + CompilerConfig cconf = OptimizerUtils.constructCompilerConfig(dmlconf); + ConfigurationManager.setLocalConfig(dmlconf); + ConfigurationManager.setLocalConfig(cconf); + } + } + + public static Program parseProgram( String in, int id ) { + String lin = in.substring( PROG_BEGIN.length(),in.length()- PROG_END.length()).trim(); + Program prog = new Program(); + HashMap<String,FunctionProgramBlock> fc = parseFunctionProgramBlocks(lin, prog, id); + for( Entry<String,FunctionProgramBlock> e : fc.entrySet() ) { + String[] keypart = e.getKey().split( Program.KEY_DELIM ); + String namespace = keypart[0]; + String name = keypart[1]; + prog.addFunctionProgramBlock(namespace, name, e.getValue()); + } + return prog; + } + + private static LocalVariableMap parseVariables(String in) { + LocalVariableMap ret = null; + if( in.length()> VARS_BEGIN.length() + VARS_END.length()) { + String varStr = in.substring( VARS_BEGIN.length(),in.length()- VARS_END.length()).trim(); + ret = LocalVariableMap.deserialize(varStr); + } + else { //empty input symbol table + ret = new LocalVariableMap(); + } + return ret; + } + + private static HashMap<String,FunctionProgramBlock> parseFunctionProgramBlocks( String in, Program prog, int id ) { + HashMap<String,FunctionProgramBlock> ret = new HashMap<>(); + HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer( in, ELEMENT_DELIM ); + while( st.hasMoreTokens() ) { + String lvar = st.nextToken(); //with ID = CP_CHILD_THREAD+id for current use + //put first copy into prog (for direct use) + int index = lvar.indexOf( KEY_VALUE_DELIM ); + String tmp1 = lvar.substring(0, index); // + CP_CHILD_THREAD+id; + String tmp2 = lvar.substring(index + 1); + ret.put(tmp1, (FunctionProgramBlock)rParseProgramBlock(tmp2, prog, id)); + } + return ret; + } + + private static ArrayList<ProgramBlock> rParseProgramBlocks(String in, Program prog, int id) { + ArrayList<ProgramBlock> pbs = new ArrayList<>(); + String tmpdata = in.substring(PBS_BEGIN.length(),in.length()- PBS_END.length()); + HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpdata, ELEMENT_DELIM); + while( st.hasMoreTokens() ) + pbs.add( rParseProgramBlock( st.nextToken(), prog, id ) ); + return pbs; + } + + private static ProgramBlock rParseProgramBlock( String in, Program prog, int id ) { + ProgramBlock pb = null; + if( in.startsWith(PB_WHILE) ) + pb = rParseWhileProgramBlock( in, prog, id ); + else if ( in.startsWith(PB_FOR) ) + pb = rParseForProgramBlock( in, prog, id ); + else if ( in.startsWith(PB_PARFOR) ) + pb = rParseParForProgramBlock( in, prog, id ); + else if ( in.startsWith(PB_IF) ) + pb = rParseIfProgramBlock( in, prog, id ); + else if ( in.startsWith(PB_FC) ) + pb = rParseFunctionProgramBlock( in, prog, id ); + else if ( in.startsWith(PB_EFC) ) + pb = rParseExternalFunctionProgramBlock( in, prog, id ); + else if ( in.startsWith(PB_BEGIN) ) + pb = rParseGenericProgramBlock( in, prog, id ); + else + throw new DMLRuntimeException( NOT_SUPPORTED_PB+" "+in ); + return pb; + } + + private static WhileProgramBlock rParseWhileProgramBlock( String in, Program prog, int id ) { + String lin = in.substring( PB_WHILE.length(),in.length()- PB_END.length()); + HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); + + //predicate instructions + ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id); + + //exit instructions + ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id); + + //program blocks + ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id); + + WhileProgramBlock wpb = new WhileProgramBlock(prog,inst); + wpb.setExitInstructions2(exit); + wpb.setChildBlocks(pbs); + + return wpb; + } + + private static ForProgramBlock rParseForProgramBlock( String in, Program prog, int id ) { + String lin = in.substring( PB_FOR.length(),in.length()- PB_END.length()); + HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); + + //inputs + String iterVar = st.nextToken(); + + //instructions + ArrayList<Instruction> from = parseInstructions(st.nextToken(),id); + ArrayList<Instruction> to = parseInstructions(st.nextToken(),id); + ArrayList<Instruction> incr = parseInstructions(st.nextToken(),id); + + //exit instructions + ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id); + + //program blocks + ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id); + + ForProgramBlock fpb = new ForProgramBlock(prog, iterVar); + fpb.setFromInstructions(from); + fpb.setToInstructions(to); + fpb.setIncrementInstructions(incr); + fpb.setExitInstructions(exit); + fpb.setChildBlocks(pbs); + + return fpb; + } + + private static ParForProgramBlock rParseParForProgramBlock( String in, Program prog, int id ) { + String lin = in.substring( PB_PARFOR.length(),in.length()- PB_END.length()); + HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); + + //inputs + String iterVar = st.nextToken(); + ArrayList<ResultVar> resultVars = parseResultVariables(st.nextToken()); + HashMap<String,String> params = parseStringHashMap(st.nextToken()); + + //instructions + ArrayList<Instruction> from = parseInstructions(st.nextToken(), 0); + ArrayList<Instruction> to = parseInstructions(st.nextToken(), 0); + ArrayList<Instruction> incr = parseInstructions(st.nextToken(), 0); + + //exit instructions + ArrayList<Instruction> exit = parseInstructions(st.nextToken(), 0); + + //program blocks //reset id to preinit state, replaced during exec + ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, 0); + + ParForProgramBlock pfpb = new ParForProgramBlock(id, prog, iterVar, params, resultVars); + pfpb.disableOptimization(); //already done in top-level parfor + pfpb.setFromInstructions(from); + pfpb.setToInstructions(to); + pfpb.setIncrementInstructions(incr); + pfpb.setExitInstructions(exit); + pfpb.setChildBlocks(pbs); + + return pfpb; + } + + private static IfProgramBlock rParseIfProgramBlock( String in, Program prog, int id ) { + String lin = in.substring( PB_IF.length(),in.length()- PB_END.length()); + HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); + + //predicate instructions + ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id); + + //exit instructions + ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id); + + //program blocks: if and else + ArrayList<ProgramBlock> pbs1 = rParseProgramBlocks(st.nextToken(), prog, id); + ArrayList<ProgramBlock> pbs2 = rParseProgramBlocks(st.nextToken(), prog, id); + + IfProgramBlock ipb = new IfProgramBlock(prog,inst); + ipb.setExitInstructions2(exit); + ipb.setChildBlocksIfBody(pbs1); + ipb.setChildBlocksElseBody(pbs2); + + return ipb; + } + + private static FunctionProgramBlock rParseFunctionProgramBlock( String in, Program prog, int id ) { + String lin = in.substring( PB_FC.length(),in.length()- PB_END.length()); + HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); + + //inputs and outputs + ArrayList<DataIdentifier> dat1 = parseDataIdentifiers(st.nextToken()); + ArrayList<DataIdentifier> dat2 = parseDataIdentifiers(st.nextToken()); + + //instructions + ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id); + + //program blocks + ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id); + + ArrayList<DataIdentifier> tmp1 = new ArrayList<>(dat1); + ArrayList<DataIdentifier> tmp2 = new ArrayList<>(dat2); + FunctionProgramBlock fpb = new FunctionProgramBlock(prog, tmp1, tmp2); + fpb.setInstructions(inst); + fpb.setChildBlocks(pbs); + + return fpb; + } + + private static ExternalFunctionProgramBlock rParseExternalFunctionProgramBlock( String in, Program prog, int id ) { + String lin = in.substring( PB_EFC.length(),in.length()- PB_END.length()); + HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); + + //inputs, outputs and params + ArrayList<DataIdentifier> dat1 = parseDataIdentifiers(st.nextToken()); + ArrayList<DataIdentifier> dat2 = parseDataIdentifiers(st.nextToken()); + HashMap<String,String> dat3 = parseStringHashMap(st.nextToken()); + + //basedir + String basedir = st.nextToken(); + + //instructions (required for removing INST BEGIN, END) + parseInstructions(st.nextToken(),id); + + //program blocks + ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id); + + ArrayList<DataIdentifier> tmp1 = new ArrayList<>(dat1); + ArrayList<DataIdentifier> tmp2 = new ArrayList<>(dat2); + + //only CP external functions, because no nested MR jobs for reblocks + ExternalFunctionProgramBlockCP efpb = new ExternalFunctionProgramBlockCP(prog, tmp1, tmp2, dat3, basedir); + efpb.setChildBlocks(pbs); + + return efpb; + } + + private static ProgramBlock rParseGenericProgramBlock( String in, Program prog, int id ) { + String lin = in.substring( PB_BEGIN.length(),in.length()- PB_END.length()); + StringTokenizer st = new StringTokenizer(lin,COMPONENTS_DELIM); + ProgramBlock pb = new ProgramBlock(prog); + pb.setInstructions(parseInstructions(st.nextToken(),id)); + return pb; + } + + private static ArrayList<Instruction> parseInstructions( String in, int id ) { + ArrayList<Instruction> insts = new ArrayList<>(); + String lin = in.substring( INST_BEGIN.length(),in.length()- INST_END.length()); + StringTokenizer st = new StringTokenizer(lin, ELEMENT_DELIM); + while(st.hasMoreTokens()) { + //Note that at this point only CP instructions and External function instruction can occur + String instStr = st.nextToken(); + try { + Instruction tmpinst = CPInstructionParser.parseSingleInstruction(instStr); + tmpinst = saveReplaceThreadID(tmpinst, Lop.CP_ROOT_THREAD_ID, Lop.CP_CHILD_THREAD+id ); + insts.add( tmpinst ); + } + catch(Exception ex) { + throw new DMLRuntimeException("Failed to parse instruction: " + instStr, ex); + } + } + return insts; + } + + private static ArrayList<ResultVar> parseResultVariables(String in) { + ArrayList<ResultVar> ret = new ArrayList<>(); + for(String var : parseStringArrayList(in)) { + boolean accum = var.endsWith("+"); + ret.add(new ResultVar(accum ? var.substring(0, var.length()-1) : var, accum)); + } + return ret; + } + + private static HashMap<String,String> parseStringHashMap( String in ) { + HashMap<String,String> vars = new HashMap<>(); + StringTokenizer st = new StringTokenizer(in,ELEMENT_DELIM); + while( st.hasMoreTokens() ) { + String lin = st.nextToken(); + int index = lin.indexOf( KEY_VALUE_DELIM ); + String tmp1 = lin.substring(0, index); + String tmp2 = lin.substring(index + 1); + vars.put(tmp1, tmp2); + } + return vars; + } + + private static ArrayList<String> parseStringArrayList(String in) { + return parseStringArrayList(in, ELEMENT_DELIM); + } + + private static ArrayList<String> parseStringArrayList(String in, String delim) { + StringTokenizer st = new StringTokenizer(in, delim); + ArrayList<String> vars = new ArrayList<>(st.countTokens()); + while( st.hasMoreTokens() ) + vars.add(st.nextToken()); + return vars; + } + + private static ArrayList<DataIdentifier> parseDataIdentifiers( String in ) { + ArrayList<DataIdentifier> vars = new ArrayList<>(); + StringTokenizer st = new StringTokenizer(in, ELEMENT_DELIM); + while( st.hasMoreTokens() ) { + String tmp = st.nextToken(); + DataIdentifier dat = parseDataIdentifier( tmp ); + vars.add(dat); + } + return vars; + } + + private static DataIdentifier parseDataIdentifier( String in ) { + StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM); + DataIdentifier dat = new DataIdentifier(st.nextToken()); + dat.setDataType(DataType.valueOf(st.nextToken())); + dat.setValueType(ValueType.valueOf(st.nextToken())); + return dat; + } + + /** + * NOTE: MRJobConfiguration cannot be used for the general case because program blocks and + * related symbol tables can be hierarchically structured. + * + * @param in data object as string + * @return array of objects + */ + public static Object[] parseDataObject(String in) { + Object[] ret = new Object[2]; + + StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM ); + String name = st.nextToken(); + DataType datatype = DataType.valueOf( st.nextToken() ); + ValueType valuetype = ValueType.valueOf( st.nextToken() ); + String valString = st.hasMoreTokens() ? st.nextToken() : ""; + Data dat = null; + switch( datatype ) + { + case SCALAR: { + switch ( valuetype ) { + case INT: dat = new IntObject(Long.parseLong(valString)); break; + case DOUBLE: dat = new DoubleObject(Double.parseDouble(valString)); break; + case BOOLEAN: dat = new BooleanObject(Boolean.parseBoolean(valString)); break; + case STRING: dat = new StringObject(valString); break; + default: + throw new DMLRuntimeException("Unable to parse valuetype "+valuetype); + } + break; + } + case MATRIX: { + MatrixObject mo = new MatrixObject(valuetype,valString); + long rows = Long.parseLong( st.nextToken() ); + long cols = Long.parseLong( st.nextToken() ); + int brows = Integer.parseInt( st.nextToken() ); + int bcols = Integer.parseInt( st.nextToken() ); + long nnz = Long.parseLong( st.nextToken() ); + InputInfo iin = InputInfo.stringToInputInfo( st.nextToken() ); + OutputInfo oin = OutputInfo.stringToOutputInfo( st.nextToken() ); + PartitionFormat partFormat = PartitionFormat.valueOf( st.nextToken() ); + UpdateType inplace = UpdateType.valueOf( st.nextToken() ); + MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, brows, bcols, nnz); + MetaDataFormat md = new MetaDataFormat( mc, oin, iin ); + mo.setMetaData( md ); + if( partFormat._dpf != PDataPartitionFormat.NONE ) + mo.setPartitioned( partFormat._dpf, partFormat._N ); + mo.setUpdateType(inplace); + dat = mo; + break; + } + case LIST: + int size = Integer.parseInt(st.nextToken()); + String namesStr = st.nextToken(); + List<String> names = namesStr.equals(EMPTY) ? null : + parseStringArrayList(namesStr, ELEMENT_DELIM2); + List<Data> data = new ArrayList<>(size); + st.nextToken(LIST_ELEMENT_DELIM); + for (int i = 0; i < size; i++) { + String dataStr = st.nextToken(); + Object[] obj = parseDataObject(dataStr); + data.add((Data) obj[1]); + } + dat = new ListObject(data, names); + break; + default: + throw new DMLRuntimeException("Unable to parse datatype "+datatype); + } + + ret[0] = name; + ret[1] = dat; + return ret; + } + + private static ExecutionContext parseExecutionContext(String in, Program prog) { + ExecutionContext ec = null; + String lin = in.substring(EC_BEGIN.length(),in.length()- EC_END.length()).trim(); + if( !lin.equals( EMPTY ) ) { + LocalVariableMap vars = parseVariables(lin); + ec = ExecutionContextFactory.createContext( false, prog ); + ec.setVariables(vars); + } + return ec; + } + + private static void parseAndSetAdditionalConfigurations(String conf) { + String[] statsFlag = conf.split("="); + DMLScript.STATISTICS = Boolean.parseBoolean(statsFlag[1]); + } + + ////////// + // CUSTOM SAFE LITERAL REPLACEMENT + + /** + * In-place replacement of thread ids in filenames, functions names etc + * + * @param inst instruction + * @param pattern ? + * @param replacement string replacement + * @return instruction + */ + private static Instruction saveReplaceThreadID( Instruction inst, String pattern, String replacement ) { + //currently known, relevant instructions: createvar, rand, seq, extfunct, + if( inst instanceof MRJobInstruction ) { + //update dims file, and internal string representations of rand/seq instructions + MRJobInstruction mrinst = (MRJobInstruction)inst; + mrinst.updateInstructionThreadID(pattern, replacement); + } + else if ( inst instanceof VariableCPInstruction ) { //createvar, setfilename + //update in-memory representation + inst.updateInstructionThreadID(pattern, replacement); + } + //NOTE> //Rand, seq in CP not required + return inst; + } + + public static String saveReplaceFilenameThreadID(String fname, String pattern, String replace) { + //save replace necessary in order to account for the possibility that read variables have our prefix in the absolute path + //replace the last match only, because (1) we have at most one _t0 and (2) always concatenated to the end. + int pos = fname.lastIndexOf(pattern); + return ( pos < 0 ) ? fname : fname.substring(0, pos) + + replace + fname.substring(pos+pattern.length()); + } + + + ////////// + // CUSTOM HIERARCHICAL TOKENIZER + + + /** + * Custom StringTokenizer for splitting strings of hierarchies. The basic idea is to + * search for delim-Strings on the same hierarchy level, while delims of lower hierarchy + * levels are skipped. + * + */ + private static class HierarchyAwareStringTokenizer //extends StringTokenizer + { + private String _str = null; + private String _del = null; + private int _off = -1; + + public HierarchyAwareStringTokenizer( String in, String delim ) { + //super(in); + _str = in; + _del = delim; + _off = delim.length(); + } + + public boolean hasMoreTokens() { + return (_str.length() > 0); + } + + public String nextToken() { + int nextDelim = determineNextSameLevelIndexOf(_str, _del); + String token = null; + if(nextDelim < 0) { + nextDelim = _str.length(); + _off = 0; + } + token = _str.substring(0,nextDelim); + _str = _str.substring( nextDelim + _off ); + return token; + } + + private static int determineNextSameLevelIndexOf( String data, String pattern ) + { + String tmpdata = data; + int index = 0; + int count = 0; + int off=0,i1,i2,i3,min; + + while(true) { + i1 = tmpdata.indexOf(pattern); + i2 = tmpdata.indexOf(LEVELIN); + i3 = tmpdata.indexOf(LEVELOUT); + + if( i1 < 0 ) return i1; //no pattern found at all + + min = i1; //min >= 0 by definition + if( i2 >= 0 ) min = Math.min(min, i2); + if( i3 >= 0 ) min = Math.min(min, i3); + + //stack maintenance + if( i1 == min && count == 0 ) + return index+i1; + else if( i2 == min ) { + count++; + off = LEVELIN.length(); + } + else if( i3 == min ) { + count--; + off = LEVELOUT.length(); + } + + //prune investigated string + index += min+off; + tmpdata = tmpdata.substring(min+off); + } + } + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservLocalNNTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservLocalNNTest.java b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservLocalNNTest.java new file mode 100644 index 0000000..d5fd509 --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservLocalNNTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.paramserv; + +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; +import org.junit.Test; + +public class ParamservLocalNNTest extends AutomatedTestBase { + + private static final String TEST_NAME1 = "paramserv-nn-bsp-batch-dc"; + private static final String TEST_NAME2 = "paramserv-nn-asp-batch"; + private static final String TEST_NAME3 = "paramserv-nn-bsp-epoch"; + private static final String TEST_NAME4 = "paramserv-nn-asp-epoch"; + private static final String TEST_NAME5 = "paramserv-nn-bsp-batch-drr"; + private static final String TEST_NAME6 = "paramserv-nn-bsp-batch-dr"; + private static final String TEST_NAME7 = "paramserv-nn-bsp-batch-or"; + + private static final String TEST_DIR = "functions/paramserv/"; + private static final String TEST_CLASS_DIR = TEST_DIR + ParamservLocalNNTest.class.getSimpleName() + "/"; + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {})); + addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {})); + addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {})); + addTestConfiguration(TEST_NAME4, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {})); + addTestConfiguration(TEST_NAME5, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME5, new String[] {})); + addTestConfiguration(TEST_NAME6, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME6, new String[] {})); + addTestConfiguration(TEST_NAME7, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME7, new String[] {})); + } + + @Test + public void testParamservBSPBatchDisjointContiguous() { + runDMLTest(TEST_NAME1); + } + + @Test + public void testParamservASPBatch() { + runDMLTest(TEST_NAME2); + } + + @Test + public void testParamservBSPEpoch() { + runDMLTest(TEST_NAME3); + } + + @Test + public void testParamservASPEpoch() { + runDMLTest(TEST_NAME4); + } + + @Test + public void testParamservBSPBatchDisjointRoundRobin() { + runDMLTest(TEST_NAME5); + } + + @Test + public void testParamservBSPBatchDisjointRandom() { + runDMLTest(TEST_NAME6); + } + + @Test + public void testParamservBSPBatchOverlapReshuffle() { + runDMLTest(TEST_NAME7); + } + + private void runDMLTest(String testname) { + TestConfiguration config = getTestConfiguration(testname); + loadTestConfiguration(config); + programArgs = new String[] { "-explain" }; + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + testname + ".dml"; + runTest(true, false, null, null, -1); + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservNNTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservNNTest.java b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservNNTest.java deleted file mode 100644 index d7afc9d..0000000 --- a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservNNTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.test.integration.functions.paramserv; - -import org.apache.sysml.test.integration.AutomatedTestBase; -import org.apache.sysml.test.integration.TestConfiguration; -import org.junit.Test; - -public class ParamservNNTest extends AutomatedTestBase { - - private static final String TEST_NAME1 = "paramserv-nn-bsp-batch-dc"; - private static final String TEST_NAME2 = "paramserv-nn-asp-batch"; - private static final String TEST_NAME3 = "paramserv-nn-bsp-epoch"; - private static final String TEST_NAME4 = "paramserv-nn-asp-epoch"; - private static final String TEST_NAME5 = "paramserv-nn-bsp-batch-drr"; - private static final String TEST_NAME6 = "paramserv-nn-bsp-batch-dr"; - private static final String TEST_NAME7 = "paramserv-nn-bsp-batch-or"; - - private static final String TEST_DIR = "functions/paramserv/"; - private static final String TEST_CLASS_DIR = TEST_DIR + ParamservNNTest.class.getSimpleName() + "/"; - - private final String HOME = SCRIPT_DIR + TEST_DIR; - - @Override - public void setUp() { - addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {})); - addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {})); - addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {})); - addTestConfiguration(TEST_NAME4, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {})); - addTestConfiguration(TEST_NAME5, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME5, new String[] {})); - addTestConfiguration(TEST_NAME6, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME6, new String[] {})); - addTestConfiguration(TEST_NAME7, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME7, new String[] {})); - } - - @Test - public void testParamservBSPBatchDisjointContiguous() { - runDMLTest(TEST_NAME1); - } - - @Test - public void testParamservASPBatch() { - runDMLTest(TEST_NAME2); - } - - @Test - public void testParamservBSPEpoch() { - runDMLTest(TEST_NAME3); - } - - @Test - public void testParamservASPEpoch() { - runDMLTest(TEST_NAME4); - } - - @Test - public void testParamservBSPBatchDisjointRoundRobin() { - runDMLTest(TEST_NAME5); - } - - @Test - public void testParamservBSPBatchDisjointRandom() { - runDMLTest(TEST_NAME6); - } - - @Test - public void testParamservBSPBatchOverlapReshuffle() { - runDMLTest(TEST_NAME7); - } - - private void runDMLTest(String testname) { - TestConfiguration config = getTestConfiguration(testname); - loadTestConfiguration(config); - programArgs = new String[] { "-explain" }; - fullDMLScriptName = HOME + testname + ".dml"; - runTest(true, false, null, null, -1); - } -}
