http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java deleted file mode 100644 index 919b357..0000000 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java +++ /dev/null @@ -1,1866 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.controlprogram.parfor; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.StringTokenizer; -import java.util.Map.Entry; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.mapred.JobConf; -import org.apache.sysml.api.DMLScript; -import org.apache.sysml.conf.CompilerConfig.ConfigType; -import org.apache.sysml.conf.CompilerConfig; -import org.apache.sysml.conf.ConfigurationManager; -import org.apache.sysml.conf.DMLConfig; -import org.apache.sysml.hops.Hop; -import org.apache.sysml.hops.OptimizerUtils; -import org.apache.sysml.hops.recompile.Recompiler; -import org.apache.sysml.parser.DMLProgram; -import org.apache.sysml.parser.DataIdentifier; -import org.apache.sysml.parser.ForStatementBlock; -import org.apache.sysml.parser.IfStatementBlock; -import org.apache.sysml.parser.ParForStatementBlock; -import org.apache.sysml.parser.StatementBlock; -import org.apache.sysml.parser.Expression.DataType; -import org.apache.sysml.parser.Expression.ValueType; -import org.apache.sysml.parser.ParForStatementBlock.ResultVar; -import org.apache.sysml.parser.WhileStatementBlock; -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.codegen.CodegenUtils; -import org.apache.sysml.runtime.controlprogram.ExternalFunctionProgramBlock; -import org.apache.sysml.runtime.controlprogram.ExternalFunctionProgramBlockCP; -import org.apache.sysml.runtime.controlprogram.ForProgramBlock; -import org.apache.sysml.runtime.controlprogram.FunctionProgramBlock; -import org.apache.sysml.runtime.controlprogram.IfProgramBlock; -import org.apache.sysml.runtime.controlprogram.LocalVariableMap; -import org.apache.sysml.runtime.controlprogram.ParForProgramBlock; -import org.apache.sysml.runtime.controlprogram.Program; -import org.apache.sysml.runtime.controlprogram.ProgramBlock; -import org.apache.sysml.runtime.controlprogram.WhileProgramBlock; -import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; -import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PExecMode; -import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat; -import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; -import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; -import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; -import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory; -import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; -import org.apache.sysml.runtime.instructions.CPInstructionParser; -import org.apache.sysml.runtime.instructions.Instruction; -import org.apache.sysml.runtime.instructions.InstructionParser; -import org.apache.sysml.runtime.instructions.MRJobInstruction; -import org.apache.sysml.runtime.instructions.cp.BooleanObject; -import org.apache.sysml.runtime.instructions.cp.CPInstruction; -import org.apache.sysml.runtime.instructions.cp.Data; -import org.apache.sysml.runtime.instructions.cp.DoubleObject; -import org.apache.sysml.runtime.instructions.cp.FunctionCallCPInstruction; -import org.apache.sysml.runtime.instructions.cp.IntObject; -import org.apache.sysml.runtime.instructions.cp.ScalarObject; -import org.apache.sysml.runtime.instructions.cp.SpoofCPInstruction; -import org.apache.sysml.runtime.instructions.cp.StringObject; -import org.apache.sysml.runtime.instructions.cp.VariableCPInstruction; -import org.apache.sysml.runtime.instructions.gpu.GPUInstruction; -import org.apache.sysml.runtime.instructions.mr.MRInstruction; -import org.apache.sysml.runtime.instructions.spark.SPInstruction; -import org.apache.sysml.runtime.matrix.MatrixCharacteristics; -import org.apache.sysml.runtime.matrix.MetaDataFormat; -import org.apache.sysml.runtime.matrix.data.InputInfo; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.matrix.data.OutputInfo; -import org.apache.sysml.udf.ExternalFunctionInvocationInstruction; - -/** - * Program converter functionalities for - * (1) creating deep copies of program blocks, instructions, function program blocks, and - * (2) serializing and parsing of programs, program blocks, functions program blocks. - * - */ -//TODO: rewrite class to instance-based invocation (grown gradually and now inappropriate design) -public class ProgramConverter -{ - protected static final Log LOG = LogFactory.getLog(ProgramConverter.class.getName()); - - //use escaped unicodes for separators in order to prevent string conflict - public static final String NEWLINE = "\n"; //System.lineSeparator(); - public static final String COMPONENTS_DELIM = "\u236e"; //semicolon w/ bar; ";"; - public static final String ELEMENT_DELIM = "\u236a"; //comma w/ bar; ","; - public static final String DATA_FIELD_DELIM = "\u007c"; //"|"; - public static final String KEY_VALUE_DELIM = "\u003d"; //"="; - public static final String LEVELIN = "\u23a8"; //variant of left curly bracket; "\u007b"; //"{"; - public static final String LEVELOUT = "\u23ac"; //variant of right curly bracket; "\u007d"; //"}"; - public static final String EMPTY = "null"; - - //public static final String CP_ROOT_THREAD_SEPARATOR = "/";//File.separator; - public static final String CP_ROOT_THREAD_ID = "_t0"; - public static final String CP_CHILD_THREAD = "_t"; - - public static final String PARFOR_CDATA_BEGIN = "<![CDATA["; - public static final String PARFOR_CDATA_END = " ]]>"; - - public static final String PARFOR_PROG_BEGIN = " PROG" + LEVELIN; - public static final String PARFOR_PROG_END = LEVELOUT; - public static final String PARFORBODY_BEGIN = PARFOR_CDATA_BEGIN+"PARFORBODY" + LEVELIN; - public static final String PARFORBODY_END = LEVELOUT+PARFOR_CDATA_END; - public static final String PARFOR_VARS_BEGIN = "VARS: "; - public static final String PARFOR_VARS_END = ""; - public static final String PARFOR_PBS_BEGIN = " PBS" + LEVELIN; - public static final String PARFOR_PBS_END = LEVELOUT; - public static final String PARFOR_INST_BEGIN = " INST: "; - public static final String PARFOR_INST_END = ""; - public static final String PARFOR_EC_BEGIN = " EC: "; - public static final String PARFOR_EC_END = ""; - public static final String PARFOR_PB_BEGIN = " PB" + LEVELIN; - public static final String PARFOR_PB_END = LEVELOUT; - public static final String PARFOR_PB_WHILE = " WHILE" + LEVELIN; - public static final String PARFOR_PB_FOR = " FOR" + LEVELIN; - public static final String PARFOR_PB_PARFOR = " PARFOR" + LEVELIN; - public static final String PARFOR_PB_IF = " IF" + LEVELIN; - public static final String PARFOR_PB_FC = " FC" + LEVELIN; - public static final String PARFOR_PB_EFC = " EFC" + LEVELIN; - - public static final String PARFOR_CONF_STATS = "stats"; - - - //exception msgs - public static final String NOT_SUPPORTED_EXTERNALFUNCTION_PB = "Not supported: ExternalFunctionProgramBlock contains MR instructions. " + - "(ExternalFunctionPRogramBlockCP can be used)"; - public static final String NOT_SUPPORTED_MR_INSTRUCTION = "Not supported: Instructions of type other than CP instructions"; - public static final String NOT_SUPPORTED_MR_PARFOR = "Not supported: Nested ParFOR REMOTE_MR due to possible deadlocks." + - "(LOCAL can be used for innner ParFOR)"; - public static final String NOT_SUPPORTED_PB = "Not supported: type of program block"; - - //////////////////////////////// - // CREATION of DEEP COPIES - //////////////////////////////// - - /** - * Creates a deep copy of the given execution context. - * For rt_platform=Hadoop, execution context has a symbol table. - * - * @param ec execution context - * @return execution context - * @throws CloneNotSupportedException if CloneNotSupportedException occurs - */ - public static ExecutionContext createDeepCopyExecutionContext(ExecutionContext ec) - throws CloneNotSupportedException - { - ExecutionContext cpec = ExecutionContextFactory.createContext(false, ec.getProgram()); - cpec.setVariables((LocalVariableMap) ec.getVariables().clone()); - - //handle result variables with in-place update flag - //(each worker requires its own copy of the empty matrix object) - for( String var : cpec.getVariables().keySet() ) { - Data dat = cpec.getVariables().get(var); - if( dat instanceof MatrixObject && ((MatrixObject)dat).getUpdateType().isInPlace() ) { - MatrixObject mo = (MatrixObject)dat; - MatrixObject moNew = new MatrixObject(mo); - if( mo.getNnz() != 0 ){ - // If output matrix is not empty (NNZ != 0), then local copy is created so that - // update in place operation can be applied. - MatrixBlock mbVar = mo.acquireRead(); - moNew.acquireModify (new MatrixBlock(mbVar)); - mo.release(); - } else { - //create empty matrix block w/ dense representation (preferred for update in-place) - //Creating a dense matrix block is valid because empty block not allocated and transfer - // to sparse representation happens in left indexing in place operation. - moNew.acquireModify(new MatrixBlock((int)mo.getNumRows(), (int)mo.getNumColumns(), false)); - } - moNew.release(); - cpec.setVariable(var, moNew); - } - } - - return cpec; - } - - /** - * This recursively creates a deep copy of program blocks and transparently replaces filenames according to the - * specified parallel worker in order to avoid conflicts between parworkers. This happens recursively in order - * to support arbitrary control-flow constructs within a parfor. - * - * @param childBlocks child program blocks - * @param pid ? - * @param IDPrefix ? - * @param fnStack ? - * @param fnCreated ? - * @param plain if true, full deep copy without id replacement - * @param forceDeepCopy if true, force deep copy - * @return list of program blocks - */ - public static ArrayList<ProgramBlock> rcreateDeepCopyProgramBlocks(ArrayList<ProgramBlock> childBlocks, long pid, int IDPrefix, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) - { - ArrayList<ProgramBlock> tmp = new ArrayList<>(); - - for( ProgramBlock pb : childBlocks ) - { - Program prog = pb.getProgram(); - ProgramBlock tmpPB = null; - - if( pb instanceof WhileProgramBlock ) - { - tmpPB = createDeepCopyWhileProgramBlock((WhileProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy); - } - else if( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock) ) - { - tmpPB = createDeepCopyForProgramBlock((ForProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy ); - } - else if( pb instanceof ParForProgramBlock ) - { - ParForProgramBlock pfpb = (ParForProgramBlock) pb; - if( ParForProgramBlock.ALLOW_NESTED_PARALLELISM ) - tmpPB = createDeepCopyParForProgramBlock(pfpb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy); - else - tmpPB = createDeepCopyForProgramBlock((ForProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy); - } - else if( pb instanceof IfProgramBlock ) - { - tmpPB = createDeepCopyIfProgramBlock((IfProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy); - } - else //last-level program block - { - tmpPB = new ProgramBlock(prog); // general case use for most PBs - - //for recompile in the master node JVM - tmpPB.setStatementBlock(createStatementBlockCopy(pb.getStatementBlock(), pid, plain, forceDeepCopy)); - //tmpPB.setStatementBlock(pb.getStatementBlock()); - tmpPB.setThreadID(pid); - } - - //copy instructions - tmpPB.setInstructions( createDeepCopyInstructionSet(pb.getInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); - - //copy symbol table - //tmpPB.setVariables( pb.getVariables() ); //implicit cloning - - tmp.add(tmpPB); - } - - return tmp; - } - - public static WhileProgramBlock createDeepCopyWhileProgramBlock(WhileProgramBlock wpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) { - ArrayList<Instruction> predinst = createDeepCopyInstructionSet(wpb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true); - WhileProgramBlock tmpPB = new WhileProgramBlock(prog, predinst); - tmpPB.setStatementBlock( createWhileStatementBlockCopy((WhileStatementBlock) wpb.getStatementBlock(), pid, plain, forceDeepCopy) ); - tmpPB.setThreadID(pid); - - tmpPB.setExitInstructions2( createDeepCopyInstructionSet(wpb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true)); - tmpPB.setChildBlocks(rcreateDeepCopyProgramBlocks(wpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy)); - - return tmpPB; - } - - public static IfProgramBlock createDeepCopyIfProgramBlock(IfProgramBlock ipb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) { - ArrayList<Instruction> predinst = createDeepCopyInstructionSet(ipb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true); - IfProgramBlock tmpPB = new IfProgramBlock(prog, predinst); - tmpPB.setStatementBlock( createIfStatementBlockCopy((IfStatementBlock)ipb.getStatementBlock(), pid, plain, forceDeepCopy ) ); - tmpPB.setThreadID(pid); - - tmpPB.setExitInstructions2( createDeepCopyInstructionSet(ipb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true)); - tmpPB.setChildBlocksIfBody(rcreateDeepCopyProgramBlocks(ipb.getChildBlocksIfBody(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy)); - tmpPB.setChildBlocksElseBody(rcreateDeepCopyProgramBlocks(ipb.getChildBlocksElseBody(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy)); - - return tmpPB; - } - - public static ForProgramBlock createDeepCopyForProgramBlock(ForProgramBlock fpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) { - ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar()); - tmpPB.setStatementBlock( createForStatementBlockCopy((ForStatementBlock)fpb.getStatementBlock(), pid, plain, forceDeepCopy)); - tmpPB.setThreadID(pid); - - tmpPB.setFromInstructions( createDeepCopyInstructionSet(fpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); - tmpPB.setToInstructions( createDeepCopyInstructionSet(fpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); - tmpPB.setIncrementInstructions( createDeepCopyInstructionSet(fpb.getIncrementInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); - tmpPB.setExitInstructions( createDeepCopyInstructionSet(fpb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); - tmpPB.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy) ); - - return tmpPB; - } - - public static ForProgramBlock createShallowCopyForProgramBlock(ForProgramBlock fpb, Program prog ) { - ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar()); - - tmpPB.setFromInstructions( fpb.getFromInstructions() ); - tmpPB.setToInstructions( fpb.getToInstructions() ); - tmpPB.setIncrementInstructions( fpb.getIncrementInstructions() ); - tmpPB.setExitInstructions( fpb.getExitInstructions() ); - tmpPB.setChildBlocks( fpb.getChildBlocks() ); - - return tmpPB; - } - - public static ParForProgramBlock createDeepCopyParForProgramBlock(ParForProgramBlock pfpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) { - ParForProgramBlock tmpPB = null; - - if( IDPrefix == -1 ) //still on master node - tmpPB = new ParForProgramBlock(prog,pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables()); - else //child of remote ParWorker at any level - tmpPB = new ParForProgramBlock(IDPrefix, prog, pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables()); - - tmpPB.setStatementBlock( createForStatementBlockCopy( (ForStatementBlock) pfpb.getStatementBlock(), pid, plain, forceDeepCopy) ); - tmpPB.setThreadID(pid); - - tmpPB.disableOptimization(); //already done in top-level parfor - tmpPB.disableMonitorReport(); //already done in top-level parfor - - tmpPB.setFromInstructions( createDeepCopyInstructionSet(pfpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); - tmpPB.setToInstructions( createDeepCopyInstructionSet(pfpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); - tmpPB.setIncrementInstructions( createDeepCopyInstructionSet(pfpb.getIncrementInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); - tmpPB.setExitInstructions( createDeepCopyInstructionSet(pfpb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); - - //NOTE: Normally, no recursive copy because (1) copied on each execution in this PB anyway - //and (2) leave placeholders as they are. However, if plain, an explicit deep copy is requested. - if( plain || forceDeepCopy ) - tmpPB.setChildBlocks( rcreateDeepCopyProgramBlocks(pfpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy) ); - else - tmpPB.setChildBlocks( pfpb.getChildBlocks() ); - - return tmpPB; - } - - /** - * This creates a deep copy of a function program block. The central reference to singletons of function program blocks - * poses the need for explicit copies in order to prevent conflicting writes of temporary variables (see ExternalFunctionProgramBlock. - * - * @param namespace function namespace - * @param oldName ? - * @param pid ? - * @param IDPrefix ? - * @param prog runtime program - * @param fnStack ? - * @param fnCreated ? - * @param plain ? - */ - public static void createDeepCopyFunctionProgramBlock(String namespace, String oldName, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain) - { - //fpb guaranteed to be non-null (checked inside getFunctionProgramBlock) - FunctionProgramBlock fpb = prog.getFunctionProgramBlock(namespace, oldName); - String fnameNew = (plain)? oldName :(oldName+CP_CHILD_THREAD+pid); - String fnameNewKey = DMLProgram.constructFunctionKey(namespace,fnameNew); - - if( prog.getFunctionProgramBlocks().containsKey(fnameNewKey) ) - return; //prevent redundant deep copy if already existent - - //create deep copy - FunctionProgramBlock copy = null; - ArrayList<DataIdentifier> tmp1 = new ArrayList<>(); - ArrayList<DataIdentifier> tmp2 = new ArrayList<>(); - if( fpb.getInputParams()!= null ) - tmp1.addAll(fpb.getInputParams()); - if( fpb.getOutputParams()!= null ) - tmp2.addAll(fpb.getOutputParams()); - - if( fpb instanceof ExternalFunctionProgramBlockCP ) - { - ExternalFunctionProgramBlockCP efpb = (ExternalFunctionProgramBlockCP) fpb; - HashMap<String,String> tmp3 = efpb.getOtherParams(); - if( IDPrefix!=-1 ) - copy = new ExternalFunctionProgramBlockCP(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),CP_CHILD_THREAD+IDPrefix,CP_CHILD_THREAD+pid)); - else - copy = new ExternalFunctionProgramBlockCP(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),CP_ROOT_THREAD_ID,CP_CHILD_THREAD+pid)); - } - else if( fpb instanceof ExternalFunctionProgramBlock ) - { - ExternalFunctionProgramBlock efpb = (ExternalFunctionProgramBlock) fpb; - HashMap<String,String> tmp3 = efpb.getOtherParams(); - if( IDPrefix!=-1 ) - copy = new ExternalFunctionProgramBlock(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),CP_CHILD_THREAD+IDPrefix, CP_CHILD_THREAD+pid)); - else - copy = new ExternalFunctionProgramBlock(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),CP_ROOT_THREAD_ID, CP_CHILD_THREAD+pid)); - } - else - { - if( !fnStack.contains(fnameNewKey) ) { - fnStack.add(fnameNewKey); - copy = new FunctionProgramBlock(prog, tmp1, tmp2); - copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, fpb.isRecompileOnce()) ); - copy.setRecompileOnce( fpb.isRecompileOnce() ); - copy.setThreadID(pid); - fnStack.remove(fnameNewKey); - } - else //stop deep copy for recursive function calls - copy = fpb; - } - - //copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning - //note: instructions not used by function program block - - //put - prog.addFunctionProgramBlock(namespace, fnameNew, copy); - fnCreated.add(DMLProgram.constructFunctionKey(namespace, fnameNew)); - } - - public static FunctionProgramBlock createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, HashSet<String> fnStack, HashSet<String> fnCreated) - { - if( fpb == null ) - throw new DMLRuntimeException("Unable to create a deep copy of a non-existing FunctionProgramBlock."); - - //create deep copy - FunctionProgramBlock copy = null; - ArrayList<DataIdentifier> tmp1 = new ArrayList<>(); - ArrayList<DataIdentifier> tmp2 = new ArrayList<>(); - if( fpb.getInputParams()!= null ) - tmp1.addAll(fpb.getInputParams()); - if( fpb.getOutputParams()!= null ) - tmp2.addAll(fpb.getOutputParams()); - - copy = new FunctionProgramBlock(fpb.getProgram(), tmp1, tmp2); - copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), 0, -1, fnStack, fnCreated, true, fpb.isRecompileOnce()) ); - copy.setStatementBlock( fpb.getStatementBlock() ); - copy.setRecompileOnce(fpb.isRecompileOnce()); - //copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning - //note: instructions not used by function program block - - return copy; - } - - - /** - * Creates a deep copy of an array of instructions and replaces the placeholders of parworker - * IDs with the concrete IDs of this parfor instance. This is a helper method uses for generating - * deep copies of program blocks. - * - * @param instSet list of instructions - * @param pid ? - * @param IDPrefix ? - * @param prog runtime program - * @param fnStack ? - * @param fnCreated ? - * @param plain ? - * @param cpFunctions ? - * @return list of instructions - */ - public static ArrayList<Instruction> createDeepCopyInstructionSet(ArrayList<Instruction> instSet, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean cpFunctions) - { - ArrayList<Instruction> tmp = new ArrayList<>(); - for( Instruction inst : instSet ) - { - if( inst instanceof FunctionCallCPInstruction && cpFunctions ) - { - FunctionCallCPInstruction finst = (FunctionCallCPInstruction) inst; - createDeepCopyFunctionProgramBlock( finst.getNamespace(), - finst.getFunctionName(), - pid, IDPrefix, prog, fnStack, fnCreated, plain ); - } - - tmp.add( cloneInstruction( inst, pid, plain, cpFunctions ) ); - } - - return tmp; - } - - public static Instruction cloneInstruction( Instruction oInst, long pid, boolean plain, boolean cpFunctions ) - { - Instruction inst = null; - String tmpString = oInst.toString(); - - try - { - if( oInst instanceof CPInstruction || oInst instanceof SPInstruction || oInst instanceof MRInstruction - || oInst instanceof GPUInstruction ) - { - if( oInst instanceof FunctionCallCPInstruction && cpFunctions ) - { - FunctionCallCPInstruction tmp = (FunctionCallCPInstruction) oInst; - if( !plain ) - { - //safe replacement because target variables might include the function name - //note: this is no update-in-place in order to keep the original function name as basis - tmpString = tmp.updateInstStringFunctionName(tmp.getFunctionName(), tmp.getFunctionName() + CP_CHILD_THREAD+pid); - } - //otherwise: preserve function name - } - - inst = InstructionParser.parseSingleInstruction(tmpString); - } - else if( oInst instanceof MRJobInstruction ) - { - //clone via copy constructor - inst = new MRJobInstruction( (MRJobInstruction)oInst ); - } - else - throw new DMLRuntimeException("Failed to clone instruction: "+oInst); - } - catch(Exception ex) - { - throw new DMLRuntimeException(ex); - } - - //save replacement of thread id references in instructions - inst = saveReplaceThreadID( inst, ProgramConverter.CP_ROOT_THREAD_ID, - ProgramConverter.CP_CHILD_THREAD+pid); - - return inst; - } - - public static StatementBlock createStatementBlockCopy( StatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) - { - StatementBlock ret = null; - - try - { - if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) - && sb != null //forced deep copy for function recompilation - && (Recompiler.requiresRecompilation( sb.getHops() ) || forceDeepCopy) ) - { - //create new statement (shallow copy livein/liveout for recompile, line numbers for explain) - ret = new StatementBlock(); - ret.setDMLProg(sb.getDMLProg()); - ret.setParseInfo(sb); - ret.setLiveIn( sb.liveIn() ); - ret.setLiveOut( sb.liveOut() ); - ret.setUpdatedVariables( sb.variablesUpdated() ); - ret.setReadVariables( sb.variablesRead() ); - - //deep copy hops dag for concurrent recompile - ArrayList<Hop> hops = Recompiler.deepCopyHopsDag( sb.getHops() ); - if( !plain ) - Recompiler.updateFunctionNames( hops, pid ); - ret.setHops( hops ); - ret.updateRecompilationFlag(); - } - else - { - ret = sb; - } - } - catch( Exception ex ) - { - throw new DMLRuntimeException( ex ); - } - - return ret; - } - - public static IfStatementBlock createIfStatementBlockCopy( IfStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) - { - IfStatementBlock ret = null; - - try - { - if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) - && sb != null //forced deep copy for function recompile - && (Recompiler.requiresRecompilation( sb.getPredicateHops() ) || forceDeepCopy) ) - { - //create new statement (shallow copy livein/liveout for recompile, line numbers for explain) - ret = new IfStatementBlock(); - ret.setDMLProg(sb.getDMLProg()); - ret.setParseInfo(sb); - ret.setLiveIn( sb.liveIn() ); - ret.setLiveOut( sb.liveOut() ); - ret.setUpdatedVariables( sb.variablesUpdated() ); - ret.setReadVariables( sb.variablesRead() ); - - //shallow copy child statements - ret.setStatements( sb.getStatements() ); - - //deep copy predicate hops dag for concurrent recompile - Hop hops = Recompiler.deepCopyHopsDag( sb.getPredicateHops() ); - ret.setPredicateHops( hops ); - ret.updatePredicateRecompilationFlag(); - } - else - { - ret = sb; - } - } - catch( Exception ex ) - { - throw new DMLRuntimeException( ex ); - } - - return ret; - } - - public static WhileStatementBlock createWhileStatementBlockCopy( WhileStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) - { - WhileStatementBlock ret = null; - - try - { - if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) - && sb != null //forced deep copy for function recompile - && (Recompiler.requiresRecompilation( sb.getPredicateHops() ) || forceDeepCopy) ) - { - //create new statement (shallow copy livein/liveout for recompile, line numbers for explain) - ret = new WhileStatementBlock(); - ret.setDMLProg(sb.getDMLProg()); - ret.setParseInfo(sb); - ret.setLiveIn( sb.liveIn() ); - ret.setLiveOut( sb.liveOut() ); - ret.setUpdatedVariables( sb.variablesUpdated() ); - ret.setReadVariables( sb.variablesRead() ); - ret.setUpdateInPlaceVars( sb.getUpdateInPlaceVars() ); - - //shallow copy child statements - ret.setStatements( sb.getStatements() ); - - //deep copy predicate hops dag for concurrent recompile - Hop hops = Recompiler.deepCopyHopsDag( sb.getPredicateHops() ); - ret.setPredicateHops( hops ); - ret.updatePredicateRecompilationFlag(); - } - else - { - ret = sb; - } - } - catch( Exception ex ) - { - throw new DMLRuntimeException( ex ); - } - - return ret; - } - - public static ForStatementBlock createForStatementBlockCopy( ForStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) - { - ForStatementBlock ret = null; - - try - { - if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) - && sb != null - && ( Recompiler.requiresRecompilation(sb.getFromHops()) || - Recompiler.requiresRecompilation(sb.getToHops()) || - Recompiler.requiresRecompilation(sb.getIncrementHops()) || - forceDeepCopy ) ) - { - ret = (sb instanceof ParForStatementBlock) ? new ParForStatementBlock() : new ForStatementBlock(); - - //create new statement (shallow copy livein/liveout for recompile, line numbers for explain) - ret.setDMLProg(sb.getDMLProg()); - ret.setParseInfo(sb); - ret.setLiveIn( sb.liveIn() ); - ret.setLiveOut( sb.liveOut() ); - ret.setUpdatedVariables( sb.variablesUpdated() ); - ret.setReadVariables( sb.variablesRead() ); - ret.setUpdateInPlaceVars( sb.getUpdateInPlaceVars() ); - - //shallow copy child statements - ret.setStatements( sb.getStatements() ); - - //deep copy predicate hops dag for concurrent recompile - if( sb.requiresFromRecompilation() ){ - Hop hops = Recompiler.deepCopyHopsDag( sb.getFromHops() ); - ret.setFromHops( hops ); - } - if( sb.requiresToRecompilation() ){ - Hop hops = Recompiler.deepCopyHopsDag( sb.getToHops() ); - ret.setToHops( hops ); - } - if( sb.requiresIncrementRecompilation() ){ - Hop hops = Recompiler.deepCopyHopsDag( sb.getIncrementHops() ); - ret.setIncrementHops( hops ); - } - ret.updatePredicateRecompilationFlags(); - } - else - { - ret = sb; - } - } - catch( Exception ex ) - { - throw new DMLRuntimeException( ex ); - } - - return ret; - } - - - //////////////////////////////// - // SERIALIZATION - //////////////////////////////// - - public static String serializeParForBody( ParForBody body ) { - return serializeParForBody(body, new HashMap<String, byte[]>()); - } - - public static String serializeParForBody( ParForBody body, HashMap<String,byte[]> clsMap ) - { - ArrayList<ProgramBlock> pbs = body.getChildBlocks(); - ArrayList<ResultVar> rVnames = body.getResultVariables(); - ExecutionContext ec = body.getEc(); - - if( pbs.isEmpty() ) - return PARFORBODY_BEGIN + PARFORBODY_END; - - Program prog = pbs.get( 0 ).getProgram(); - - StringBuilder sb = new StringBuilder(); - sb.append( PARFORBODY_BEGIN ); - sb.append( NEWLINE ); - - //handle DMLScript UUID (propagate original uuid for writing to scratch space) - sb.append( DMLScript.getUUID() ); - sb.append( COMPONENTS_DELIM ); - sb.append( NEWLINE ); - - //handle DML config - sb.append( ConfigurationManager.getDMLConfig().serializeDMLConfig() ); - sb.append( COMPONENTS_DELIM ); - sb.append( NEWLINE ); - - //handle additional configurations - sb.append( PARFOR_CONF_STATS + "=" + DMLScript.STATISTICS ); - sb.append( COMPONENTS_DELIM ); - sb.append( NEWLINE ); - - //handle program - sb.append( PARFOR_PROG_BEGIN ); - sb.append( NEWLINE ); - sb.append( serializeProgram(prog, pbs, clsMap) ); - sb.append( PARFOR_PROG_END ); - sb.append( NEWLINE ); - sb.append( COMPONENTS_DELIM ); - sb.append( NEWLINE ); - - //handle result variable names - sb.append( serializeResultVariables(rVnames) ); - sb.append( COMPONENTS_DELIM ); - - //handle execution context - //note: this includes also the symbol table (serialize only the top-level variable map, - // (symbol tables for nested/child blocks are created at parse time, on the remote side) - sb.append( PARFOR_EC_BEGIN ); - sb.append( serializeExecutionContext(ec) ); - sb.append( PARFOR_EC_END ); - sb.append( NEWLINE ); - sb.append( COMPONENTS_DELIM ); - sb.append( NEWLINE ); - - //handle program blocks -- ONLY instructions, not variables. - sb.append( PARFOR_PBS_BEGIN ); - sb.append( NEWLINE ); - sb.append( rSerializeProgramBlocks(pbs, clsMap) ); - sb.append( PARFOR_PBS_END ); - sb.append( NEWLINE ); - - sb.append( PARFORBODY_END ); - - return sb.toString(); - } - - private static String serializeProgram( Program prog, ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap ) { - //note program contains variables, programblocks and function program blocks - //but in order to avoid redundancy, we only serialize function program blocks - HashMap<String, FunctionProgramBlock> fpb = prog.getFunctionProgramBlocks(); - HashSet<String> cand = new HashSet<>(); - rFindSerializationCandidates(pbs, cand); - return rSerializeFunctionProgramBlocks( fpb, cand, clsMap ); - } - - private static void rFindSerializationCandidates( ArrayList<ProgramBlock> pbs, HashSet<String> cand ) - { - for( ProgramBlock pb : pbs ) - { - if( pb instanceof WhileProgramBlock ) - { - WhileProgramBlock wpb = (WhileProgramBlock) pb; - rFindSerializationCandidates(wpb.getChildBlocks(), cand ); - } - else if ( pb instanceof ForProgramBlock || pb instanceof ParForProgramBlock ) - { - ForProgramBlock fpb = (ForProgramBlock) pb; - rFindSerializationCandidates(fpb.getChildBlocks(), cand); - } - else if ( pb instanceof IfProgramBlock ) - { - IfProgramBlock ipb = (IfProgramBlock) pb; - rFindSerializationCandidates(ipb.getChildBlocksIfBody(), cand); - if( ipb.getChildBlocksElseBody() != null ) - rFindSerializationCandidates(ipb.getChildBlocksElseBody(), cand); - } - else //all generic program blocks - { - for( Instruction inst : pb.getInstructions() ) - if( inst instanceof FunctionCallCPInstruction ) - { - FunctionCallCPInstruction fci = (FunctionCallCPInstruction) inst; - String fkey = DMLProgram.constructFunctionKey(fci.getNamespace(), fci.getFunctionName()); - if( !cand.contains(fkey) ) //memoization for multiple calls, recursion - { - cand.add( fkey ); //add to candidates - - //investigate chains of function calls - FunctionProgramBlock fpb = pb.getProgram().getFunctionProgramBlock(fci.getNamespace(), fci.getFunctionName()); - rFindSerializationCandidates(fpb.getChildBlocks(), cand); - } - } - } - } - } - - private static String serializeVariables (LocalVariableMap vars) { - StringBuilder sb = new StringBuilder(); - sb.append( PARFOR_VARS_BEGIN ); - sb.append( vars.serialize() ); - sb.append( PARFOR_VARS_END ); - return sb.toString(); - } - - public static String serializeDataObject(String key, Data dat) - { - // SCHEMA: <name>|<datatype>|<valuetype>|value - // (scalars are serialize by value, matrices by filename) - StringBuilder sb = new StringBuilder(); - //prepare data for serialization - String name = key; - DataType datatype = dat.getDataType(); - ValueType valuetype = dat.getValueType(); - String value = null; - String[] matrixMetaData = null; - switch( datatype ) - { - case SCALAR: - ScalarObject so = (ScalarObject) dat; - //name = so.getName(); - value = so.getStringValue(); - break; - case MATRIX: - MatrixObject mo = (MatrixObject) dat; - MetaDataFormat md = (MetaDataFormat) dat.getMetaData(); - MatrixCharacteristics mc = md.getMatrixCharacteristics(); - value = mo.getFileName(); - PartitionFormat partFormat = (mo.getPartitionFormat()!=null) ? new PartitionFormat( - mo.getPartitionFormat(),mo.getPartitionSize()) : PartitionFormat.NONE; - matrixMetaData = new String[9]; - matrixMetaData[0] = String.valueOf( mc.getRows() ); - matrixMetaData[1] = String.valueOf( mc.getCols() ); - matrixMetaData[2] = String.valueOf( mc.getRowsPerBlock() ); - matrixMetaData[3] = String.valueOf( mc.getColsPerBlock() ); - matrixMetaData[4] = String.valueOf( mc.getNonZeros() ); - matrixMetaData[5] = InputInfo.inputInfoToString( md.getInputInfo() ); - matrixMetaData[6] = OutputInfo.outputInfoToString( md.getOutputInfo() ); - matrixMetaData[7] = String.valueOf( partFormat ); - matrixMetaData[8] = String.valueOf( mo.getUpdateType() ); - break; - default: - throw new DMLRuntimeException("Unable to serialize datatype "+datatype); - } - - //serialize data - sb.append(name); - sb.append(DATA_FIELD_DELIM); - sb.append(datatype); - sb.append(DATA_FIELD_DELIM); - sb.append(valuetype); - sb.append(DATA_FIELD_DELIM); - sb.append(value); - if( matrixMetaData != null ) - for( int i=0; i<matrixMetaData.length; i++ ) - { - sb.append(DATA_FIELD_DELIM); - sb.append(matrixMetaData[i]); - } - - return sb.toString(); - } - - private static String serializeExecutionContext( ExecutionContext ec ) { - return (ec != null) ? serializeVariables( ec.getVariables() ) : EMPTY; - } - - @SuppressWarnings("all") - private static String serializeInstructions( ArrayList<Instruction> inst, HashMap<String, byte[]> clsMap ) - { - StringBuilder sb = new StringBuilder(); - int count = 0; - for( Instruction linst : inst ) - { - //check that only cp instruction are transmitted - if( !( linst instanceof CPInstruction || linst instanceof ExternalFunctionInvocationInstruction ) ) - throw new DMLRuntimeException( NOT_SUPPORTED_MR_INSTRUCTION + " " +linst.getClass().getName()+"\n"+linst ); - - //obtain serialized version of generated classes - if( linst instanceof SpoofCPInstruction ) { - Class<?> cla = ((SpoofCPInstruction) linst).getOperatorClass(); - clsMap.put(cla.getName(), CodegenUtils.getClassData(cla.getName())); - } - - if( count > 0 ) - sb.append( ELEMENT_DELIM ); - - sb.append( checkAndReplaceLiterals( linst.toString() ) ); - count++; - } - - return sb.toString(); - } - - /** - * Replacement of internal delimiters occurring in literals of instructions - * in order to ensure robustness of serialization and parsing. - * (e.g. print( "a,b" ) would break the parsing of instruction that internally - * are separated with a "," ) - * - * @param instStr instruction string - * @return instruction string with replacements - */ - private static String checkAndReplaceLiterals( String instStr ) - { - String tmp = instStr; - - //1) check own delimiters (very unlikely due to special characters) - if( tmp.contains(COMPONENTS_DELIM) ) { - tmp = tmp.replaceAll(COMPONENTS_DELIM, "."); - LOG.warn("Replaced special literal character sequence "+COMPONENTS_DELIM+" with '.'"); - } - - if( tmp.contains(ELEMENT_DELIM) ) { - tmp = tmp.replaceAll(ELEMENT_DELIM, "."); - LOG.warn("Replaced special literal character sequence "+ELEMENT_DELIM+" with '.'"); - } - - if( tmp.contains( LEVELIN ) ){ - tmp = tmp.replaceAll(LEVELIN, "("); // '\\' required if LEVELIN='{' because regex - LOG.warn("Replaced special literal character sequence "+LEVELIN+" with '('"); - } - - if( tmp.contains(LEVELOUT) ){ - tmp = tmp.replaceAll(LEVELOUT, ")"); - LOG.warn("Replaced special literal character sequence "+LEVELOUT+" with ')'"); - } - - //NOTE: DATA_FIELD_DELIM and KEY_VALUE_DELIM not required - //because those literals cannot occur in critical places. - - //2) check end tag of CDATA - if( tmp.contains(PARFOR_CDATA_END) ){ - tmp = tmp.replaceAll(PARFOR_CDATA_END, "."); //prevent XML parsing issues in job.xml - LOG.warn("Replaced special literal character sequence "+PARFOR_CDATA_END+" with '.'"); - } - - return tmp; - } - - private static String serializeStringHashMap( HashMap<String,String> vars) - { - StringBuilder sb = new StringBuilder(); - int count=0; - for( Entry<String,String> e : vars.entrySet() ) - { - if(count>0) - sb.append( ELEMENT_DELIM ); - sb.append( e.getKey() ); - sb.append( KEY_VALUE_DELIM ); - sb.append( e.getValue() ); - count++; - } - return sb.toString(); - } - - public static String serializeStringCollection( Collection<String> set) - { - StringBuilder sb = new StringBuilder(); - int count=0; - for( String s : set ) - { - if(count>0) - sb.append( ", " ); - sb.append( s ); - count++; - } - return sb.toString(); - } - - public static String serializeResultVariables( ArrayList<ResultVar> vars) { - StringBuilder sb = new StringBuilder(); - int count=0; - for( ResultVar var : vars ) { - if(count>0) - sb.append( ELEMENT_DELIM ); - sb.append( var._isAccum ? var._name+"+" : var._name ); - count++; - } - return sb.toString(); - } - - public static String serializeStringArrayList( ArrayList<String> vars) - { - StringBuilder sb = new StringBuilder(); - int count=0; - for( String s : vars ) - { - if(count>0) - sb.append( ELEMENT_DELIM ); - sb.append( s ); - count++; - } - return sb.toString(); - } - - private static String serializeDataIdentifiers( ArrayList<DataIdentifier> var) - { - StringBuilder sb = new StringBuilder(); - int count=0; - for( DataIdentifier dat : var ) - { - if(count>0) - sb.append( ELEMENT_DELIM ); - sb.append( serializeDataIdentifier(dat) ); - count++; - } - return sb.toString(); - } - - private static String serializeDataIdentifier( DataIdentifier dat ) { - // SCHEMA: <name>|<datatype>|<valuetype> - StringBuilder sb = new StringBuilder(); - sb.append(dat.getName()); - sb.append(DATA_FIELD_DELIM); - sb.append(dat.getDataType()); - sb.append(DATA_FIELD_DELIM); - sb.append(dat.getValueType()); - - return sb.toString(); - } - - private static String rSerializeFunctionProgramBlocks(HashMap<String,FunctionProgramBlock> pbs, HashSet<String> cand, HashMap<String, byte[]> clsMap) - { - StringBuilder sb = new StringBuilder(); - - int count = 0; - for( Entry<String,FunctionProgramBlock> pb : pbs.entrySet() ) - { - if( !cand.contains(pb.getKey()) ) //skip function not included in the parfor body - continue; - - if( count>0 ) { - sb.append( ELEMENT_DELIM ); - sb.append( NEWLINE ); - } - sb.append( pb.getKey() ); - sb.append( KEY_VALUE_DELIM ); - sb.append( rSerializeProgramBlock(pb.getValue(), clsMap) ); - count++; - } - sb.append(NEWLINE); - return sb.toString(); - } - - private static String rSerializeProgramBlocks(ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap) { - StringBuilder sb = new StringBuilder(); - int count = 0; - for( ProgramBlock pb : pbs ) - { - if( count>0 ) - { - sb.append( ELEMENT_DELIM ); - sb.append(NEWLINE); - } - sb.append( rSerializeProgramBlock(pb, clsMap) ); - count++; - } - - return sb.toString(); - } - - private static String rSerializeProgramBlock( ProgramBlock pb, HashMap<String, byte[]> clsMap ) { - StringBuilder sb = new StringBuilder(); - - //handle header - if( pb instanceof WhileProgramBlock ) - sb.append( PARFOR_PB_WHILE ); - else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock) ) - sb.append( PARFOR_PB_FOR ); - else if ( pb instanceof ParForProgramBlock ) - sb.append( PARFOR_PB_PARFOR ); - else if ( pb instanceof IfProgramBlock ) - sb.append( PARFOR_PB_IF ); - else if ( pb instanceof FunctionProgramBlock && !(pb instanceof ExternalFunctionProgramBlock) ) - sb.append( PARFOR_PB_FC ); - else if ( pb instanceof ExternalFunctionProgramBlock ) - sb.append( PARFOR_PB_EFC ); - else //all generic program blocks - sb.append( PARFOR_PB_BEGIN ); - - //handle body - if( pb instanceof WhileProgramBlock ) - { - WhileProgramBlock wpb = (WhileProgramBlock) pb; - sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions( wpb.getPredicate(), clsMap ) ); - sb.append( PARFOR_INST_END ); - sb.append( COMPONENTS_DELIM ); - sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions( wpb.getExitInstructions(), clsMap ) ); - sb.append( PARFOR_INST_END ); - sb.append( COMPONENTS_DELIM ); - sb.append( PARFOR_PBS_BEGIN ); - sb.append( rSerializeProgramBlocks( wpb.getChildBlocks(), clsMap) ); - sb.append( PARFOR_PBS_END ); - } - else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock ) ) - { - ForProgramBlock fpb = (ForProgramBlock) pb; - sb.append( fpb.getIterVar() ); - sb.append( COMPONENTS_DELIM ); - sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions( fpb.getFromInstructions(), clsMap ) ); - sb.append( PARFOR_INST_END ); - sb.append( COMPONENTS_DELIM ); - sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions(fpb.getToInstructions(), clsMap) ); - sb.append( PARFOR_INST_END ); - sb.append( COMPONENTS_DELIM ); - sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions(fpb.getIncrementInstructions(), clsMap) ); - sb.append( PARFOR_INST_END ); - sb.append( COMPONENTS_DELIM ); - sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions(fpb.getExitInstructions(), clsMap) ); - sb.append( PARFOR_INST_END ); - sb.append( COMPONENTS_DELIM ); - sb.append( PARFOR_PBS_BEGIN ); - sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) ); - sb.append( PARFOR_PBS_END ); - } - else if ( pb instanceof ParForProgramBlock ) - { - ParForProgramBlock pfpb = (ParForProgramBlock) pb; - - //check for nested remote ParFOR - if( PExecMode.valueOf( pfpb.getParForParams().get( ParForStatementBlock.EXEC_MODE )) == PExecMode.REMOTE_MR ) - throw new DMLRuntimeException( NOT_SUPPORTED_MR_PARFOR ); - - sb.append( pfpb.getIterVar() ); - sb.append( COMPONENTS_DELIM ); - sb.append( serializeResultVariables( pfpb.getResultVariables()) ); - sb.append( COMPONENTS_DELIM ); - sb.append( serializeStringHashMap( pfpb.getParForParams()) ); //parameters of nested parfor - sb.append( COMPONENTS_DELIM ); - sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions(pfpb.getFromInstructions(), clsMap) ); - sb.append( PARFOR_INST_END ); - sb.append( COMPONENTS_DELIM ); - sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions(pfpb.getToInstructions(), clsMap) ); - sb.append( PARFOR_INST_END ); - sb.append( COMPONENTS_DELIM ); - sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions(pfpb.getIncrementInstructions(), clsMap) ); - sb.append( PARFOR_INST_END ); - sb.append( COMPONENTS_DELIM ); - sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions(pfpb.getExitInstructions(), clsMap) ); - sb.append( PARFOR_INST_END ); - sb.append( COMPONENTS_DELIM ); - sb.append( PARFOR_PBS_BEGIN ); - sb.append( rSerializeProgramBlocks( pfpb.getChildBlocks(), clsMap ) ); - sb.append( PARFOR_PBS_END ); - } - else if ( pb instanceof IfProgramBlock ) - { - IfProgramBlock ipb = (IfProgramBlock) pb; - sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions(ipb.getPredicate(), clsMap) ); - sb.append( PARFOR_INST_END ); - sb.append( COMPONENTS_DELIM ); - sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions(ipb.getExitInstructions(), clsMap) ); - sb.append( PARFOR_INST_END ); - sb.append( COMPONENTS_DELIM ); - sb.append( PARFOR_PBS_BEGIN ); - sb.append( rSerializeProgramBlocks(ipb.getChildBlocksIfBody(), clsMap) ); - sb.append( PARFOR_PBS_END ); - sb.append( COMPONENTS_DELIM ); - sb.append( PARFOR_PBS_BEGIN ); - sb.append( rSerializeProgramBlocks(ipb.getChildBlocksElseBody(), clsMap) ); - sb.append( PARFOR_PBS_END ); - } - else if( pb instanceof FunctionProgramBlock && !(pb instanceof ExternalFunctionProgramBlock) ) - { - FunctionProgramBlock fpb = (FunctionProgramBlock) pb; - - sb.append( serializeDataIdentifiers( fpb.getInputParams() ) ); - sb.append( COMPONENTS_DELIM ); - sb.append( serializeDataIdentifiers( fpb.getOutputParams() ) ); - sb.append( COMPONENTS_DELIM ); - sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions(fpb.getInstructions(), clsMap) ); - sb.append( PARFOR_INST_END ); - sb.append( COMPONENTS_DELIM ); - sb.append( PARFOR_PBS_BEGIN ); - sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) ); - sb.append( PARFOR_PBS_END ); - sb.append( COMPONENTS_DELIM ); - } - else if( pb instanceof ExternalFunctionProgramBlock ) - { - if( !(pb instanceof ExternalFunctionProgramBlockCP) ) - { - throw new DMLRuntimeException( NOT_SUPPORTED_EXTERNALFUNCTION_PB ); - } - - ExternalFunctionProgramBlockCP fpb = (ExternalFunctionProgramBlockCP) pb; - - sb.append( serializeDataIdentifiers( fpb.getInputParams() ) ); - sb.append( COMPONENTS_DELIM ); - sb.append( serializeDataIdentifiers( fpb.getOutputParams() ) ); - sb.append( COMPONENTS_DELIM ); - sb.append( serializeStringHashMap( fpb.getOtherParams() ) ); - sb.append( COMPONENTS_DELIM ); - sb.append( fpb.getBaseDir() ); - sb.append( COMPONENTS_DELIM ); - - sb.append( PARFOR_INST_BEGIN ); - //create on construction anyway - sb.append( PARFOR_INST_END ); - sb.append( COMPONENTS_DELIM ); - sb.append( PARFOR_PBS_BEGIN ); - sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) ); - sb.append( PARFOR_PBS_END ); - } - else //all generic program blocks - { - sb.append( PARFOR_INST_BEGIN ); - sb.append( serializeInstructions(pb.getInstructions(), clsMap) ); - sb.append( PARFOR_INST_END ); - } - - - //handle end - sb.append( PARFOR_PB_END ); - - return sb.toString(); - } - - - //////////////////////////////// - // PARSING - //////////////////////////////// - public static ParForBody parseParForBody( String in, int id ) { - return parseParForBody(in, id, false); - } - - public static ParForBody parseParForBody( String in, int id, boolean inSpark ) { - ParForBody body = new ParForBody(); - - //header elimination - String tmpin = in.replaceAll(NEWLINE, ""); //normalization - tmpin = tmpin.substring(PARFORBODY_BEGIN.length(),tmpin.length()-PARFORBODY_END.length()); //remove start/end - HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpin, COMPONENTS_DELIM); - - //handle DMLScript UUID (NOTE: set directly in DMLScript) - //(master UUID is used for all nodes (in order to simply cleanup)) - DMLScript.setUUID( st.nextToken() ); - - //handle DML config (NOTE: set directly in ConfigurationManager) - String confStr = st.nextToken(); - JobConf job = ConfigurationManager.getCachedJobConf(); - if( !InfrastructureAnalyzer.isLocalMode(job) ) { - if( confStr != null && !confStr.trim().isEmpty() ) { - DMLConfig dmlconf = DMLConfig.parseDMLConfig(confStr); - CompilerConfig cconf = OptimizerUtils.constructCompilerConfig(dmlconf); - ConfigurationManager.setLocalConfig(dmlconf); - ConfigurationManager.setLocalConfig(cconf); - } - //init internal configuration w/ parsed or default config - ParForProgramBlock.initInternalConfigurations( - ConfigurationManager.getDMLConfig()); - } - - //handle additional configs - String aconfs = st.nextToken(); - if( !inSpark ) - parseAndSetAdditionalConfigurations( aconfs ); - - //handle program - String progStr = st.nextToken(); - Program prog = parseProgram( progStr, id ); - - //handle result variable names - String rvarStr = st.nextToken(); - ArrayList<ResultVar> rvars = parseResultVariables(rvarStr); - body.setResultVariables(rvars); - - //handle execution context - String ecStr = st.nextToken(); - ExecutionContext ec = parseExecutionContext( ecStr, prog ); - - //handle program blocks - String spbs = st.nextToken(); - ArrayList<ProgramBlock> pbs = rParseProgramBlocks(spbs, prog, id); - - body.setChildBlocks( pbs ); - body.setEc( ec ); - - return body; - } - - public static Program parseProgram( String in, int id ) { - String lin = in.substring( PARFOR_PROG_BEGIN.length(),in.length()-PARFOR_PROG_END.length()).trim(); - - Program prog = new Program(); - HashMap<String,FunctionProgramBlock> fc = parseFunctionProgramBlocks(lin, prog, id); - - for( Entry<String,FunctionProgramBlock> e : fc.entrySet() ) - { - String[] keypart = e.getKey().split( Program.KEY_DELIM ); - String namespace = keypart[0]; - String name = keypart[1]; - - prog.addFunctionProgramBlock(namespace, name, e.getValue()); - } - - return prog; - } - - private static LocalVariableMap parseVariables(String in) { - LocalVariableMap ret = null; - - if( in.length()> PARFOR_VARS_BEGIN.length() + PARFOR_VARS_END.length()) - { - String varStr = in.substring( PARFOR_VARS_BEGIN.length(),in.length()-PARFOR_VARS_END.length()).trim(); - ret = LocalVariableMap.deserialize(varStr); - } - else //empty input symbol table - { - ret = new LocalVariableMap(); - } - - return ret; - } - - private static HashMap<String,FunctionProgramBlock> parseFunctionProgramBlocks( String in, Program prog, int id ) { - HashMap<String,FunctionProgramBlock> ret = new HashMap<>(); - HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer( in, ELEMENT_DELIM ); - - while( st.hasMoreTokens() ) - { - String lvar = st.nextToken(); //with ID = CP_CHILD_THREAD+id for current use - - //put first copy into prog (for direct use) - int index = lvar.indexOf( KEY_VALUE_DELIM ); - String tmp1 = lvar.substring(0, index); // + CP_CHILD_THREAD+id; - String tmp2 = lvar.substring(index + 1); - ret.put(tmp1, (FunctionProgramBlock)rParseProgramBlock(tmp2, prog, id)); - } - - return ret; - } - - private static ArrayList<ProgramBlock> rParseProgramBlocks(String in, Program prog, int id) { - ArrayList<ProgramBlock> pbs = new ArrayList<>(); - String tmpdata = in.substring(PARFOR_PBS_BEGIN.length(),in.length()-PARFOR_PBS_END.length()); - HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpdata, ELEMENT_DELIM); - - while( st.hasMoreTokens() ) - { - String tmp = st.nextToken(); - pbs.add( rParseProgramBlock( tmp, prog, id ) ); - } - - return pbs; - } - - private static ProgramBlock rParseProgramBlock( String in, Program prog, int id ) { - ProgramBlock pb = null; - - if( in.startsWith( PARFOR_PB_WHILE ) ) - pb = rParseWhileProgramBlock( in, prog, id ); - else if ( in.startsWith(PARFOR_PB_FOR ) ) - pb = rParseForProgramBlock( in, prog, id ); - else if ( in.startsWith(PARFOR_PB_PARFOR ) ) - pb = rParseParForProgramBlock( in, prog, id ); - else if ( in.startsWith(PARFOR_PB_IF ) ) - pb = rParseIfProgramBlock( in, prog, id ); - else if ( in.startsWith(PARFOR_PB_FC ) ) - pb = rParseFunctionProgramBlock( in, prog, id ); - else if ( in.startsWith(PARFOR_PB_EFC ) ) - pb = rParseExternalFunctionProgramBlock( in, prog, id ); - else if ( in.startsWith(PARFOR_PB_BEGIN ) ) - pb = rParseGenericProgramBlock( in, prog, id ); - else - throw new DMLRuntimeException( NOT_SUPPORTED_PB+" "+in ); - - return pb; - } - - private static WhileProgramBlock rParseWhileProgramBlock( String in, Program prog, int id ) { - String lin = in.substring( PARFOR_PB_WHILE.length(),in.length()-PARFOR_PB_END.length()); - HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); - - //predicate instructions - ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id); - - //exit instructions - ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id); - - //program blocks - ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id); - - WhileProgramBlock wpb = new WhileProgramBlock(prog,inst); - wpb.setExitInstructions2(exit); - wpb.setChildBlocks(pbs); - - return wpb; - } - - private static ForProgramBlock rParseForProgramBlock( String in, Program prog, int id ) { - String lin = in.substring( PARFOR_PB_FOR.length(),in.length()-PARFOR_PB_END.length()); - HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); - - //inputs - String iterVar = st.nextToken(); - - //instructions - ArrayList<Instruction> from = parseInstructions(st.nextToken(),id); - ArrayList<Instruction> to = parseInstructions(st.nextToken(),id); - ArrayList<Instruction> incr = parseInstructions(st.nextToken(),id); - - //exit instructions - ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id); - - //program blocks - ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id); - - ForProgramBlock fpb = new ForProgramBlock(prog, iterVar); - fpb.setFromInstructions(from); - fpb.setToInstructions(to); - fpb.setIncrementInstructions(incr); - fpb.setExitInstructions(exit); - fpb.setChildBlocks(pbs); - - return fpb; - } - - private static ParForProgramBlock rParseParForProgramBlock( String in, Program prog, int id ) { - String lin = in.substring( PARFOR_PB_PARFOR.length(),in.length()-PARFOR_PB_END.length()); - HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); - - //inputs - String iterVar = st.nextToken(); - ArrayList<ResultVar> resultVars = parseResultVariables(st.nextToken()); - HashMap<String,String> params = parseStringHashMap(st.nextToken()); - - //instructions - ArrayList<Instruction> from = parseInstructions(st.nextToken(), 0); - ArrayList<Instruction> to = parseInstructions(st.nextToken(), 0); - ArrayList<Instruction> incr = parseInstructions(st.nextToken(), 0); - - //exit instructions - ArrayList<Instruction> exit = parseInstructions(st.nextToken(), 0); - - //program blocks //reset id to preinit state, replaced during exec - ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, 0); - - ParForProgramBlock pfpb = new ParForProgramBlock(id, prog, iterVar, params, resultVars); - pfpb.disableOptimization(); //already done in top-level parfor - pfpb.setFromInstructions(from); - pfpb.setToInstructions(to); - pfpb.setIncrementInstructions(incr); - pfpb.setExitInstructions(exit); - pfpb.setChildBlocks(pbs); - - return pfpb; - } - - private static IfProgramBlock rParseIfProgramBlock( String in, Program prog, int id ) { - String lin = in.substring( PARFOR_PB_IF.length(),in.length()-PARFOR_PB_END.length()); - HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); - - //predicate instructions - ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id); - - //exit instructions - ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id); - - //program blocks: if and else - ArrayList<ProgramBlock> pbs1 = rParseProgramBlocks(st.nextToken(), prog, id); - ArrayList<ProgramBlock> pbs2 = rParseProgramBlocks(st.nextToken(), prog, id); - - IfProgramBlock ipb = new IfProgramBlock(prog,inst); - ipb.setExitInstructions2(exit); - ipb.setChildBlocksIfBody(pbs1); - ipb.setChildBlocksElseBody(pbs2); - - return ipb; - } - - private static FunctionProgramBlock rParseFunctionProgramBlock( String in, Program prog, int id ) { - String lin = in.substring( PARFOR_PB_FC.length(),in.length()-PARFOR_PB_END.length()); - HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); - - //inputs and outputs - ArrayList<DataIdentifier> dat1 = parseDataIdentifiers(st.nextToken()); - ArrayList<DataIdentifier> dat2 = parseDataIdentifiers(st.nextToken()); - - //instructions - ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id); - - //program blocks - ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id); - - ArrayList<DataIdentifier> tmp1 = new ArrayList<>(dat1); - ArrayList<DataIdentifier> tmp2 = new ArrayList<>(dat2); - FunctionProgramBlock fpb = new FunctionProgramBlock(prog, tmp1, tmp2); - fpb.setInstructions(inst); - fpb.setChildBlocks(pbs); - - return fpb; - } - - private static ExternalFunctionProgramBlock rParseExternalFunctionProgramBlock( String in, Program prog, int id ) { - String lin = in.substring( PARFOR_PB_EFC.length(),in.length()-PARFOR_PB_END.length()); - HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); - - //LocalVariableMap vars = parseVariables(st.nextToken()); - - //inputs, outputs and params - ArrayList<DataIdentifier> dat1 = parseDataIdentifiers(st.nextToken()); - ArrayList<DataIdentifier> dat2 = parseDataIdentifiers(st.nextToken()); - HashMap<String,String> dat3 = parseStringHashMap(st.nextToken()); - - //basedir - String basedir = st.nextToken(); - - //instructions (required for removing INST BEGIN, END) - parseInstructions(st.nextToken(),id); - - //program blocks - ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id); - - ArrayList<DataIdentifier> tmp1 = new ArrayList<>(dat1); - ArrayList<DataIdentifier> tmp2 = new ArrayList<>(dat2); - - //only CP external functions, because no nested MR jobs for reblocks - ExternalFunctionProgramBlockCP efpb = new ExternalFunctionProgramBlockCP(prog, tmp1, tmp2, dat3, basedir); - efpb.setChildBlocks(pbs); - - return efpb; - } - - private static ProgramBlock rParseGenericProgramBlock( String in, Program prog, int id ) { - String lin = in.substring( PARFOR_PB_BEGIN.length(),in.length()-PARFOR_PB_END.length()); - StringTokenizer st = new StringTokenizer(lin,COMPONENTS_DELIM); - - ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id); - - ProgramBlock pb = new ProgramBlock(prog); - pb.setInstructions(inst); - - return pb; - } - - private static ArrayList<Instruction> parseInstructions( String in, int id ) { - ArrayList<Instruction> insts = new ArrayList<>(); - String lin = in.substring( PARFOR_INST_BEGIN.length(),in.length()-PARFOR_INST_END.length()); - StringTokenizer st = new StringTokenizer(lin, ELEMENT_DELIM); - while(st.hasMoreTokens()) { - //Note that at this point only CP instructions and External function instruction can occur - String instStr = st.nextToken(); - try { - Instruction tmpinst = CPInstructionParser.parseSingleInstruction(instStr); - tmpinst = saveReplaceThreadID(tmpinst, CP_ROOT_THREAD_ID, CP_CHILD_THREAD+id ); - insts.add( tmpinst ); - } - catch(Exception ex) { - throw new DMLRuntimeException("Failed to parse instruction: " + instStr, ex); - } - } - return insts; - } - - private static ArrayList<ResultVar> parseResultVariables(String in) { - ArrayList<ResultVar> ret = new ArrayList<>(); - for(String var : parseStringArrayList(in)) { - boolean accum = var.endsWith("+"); - ret.add(new ResultVar(accum ? var.substring(0, var.length()-1) : var, accum)); - } - return ret; - } - - private static HashMap<String,String> parseStringHashMap( String in ) { - HashMap<String,String> vars = new HashMap<>(); - StringTokenizer st = new StringTokenizer(in,ELEMENT_DELIM); - while( st.hasMoreTokens() ) { - String lin = st.nextToken(); - int index = lin.indexOf( KEY_VALUE_DELIM ); - String tmp1 = lin.substring(0, index); - String tmp2 = lin.substring(index + 1); - vars.put(tmp1, tmp2); - } - return vars; - } - - private static ArrayList<String> parseStringArrayList( String in ) - { - ArrayList<String> vars = new ArrayList<>(); - StringTokenizer st = new StringTokenizer(in,ELEMENT_DELIM); - while( st.hasMoreTokens() ) { - String tmp = st.nextToken(); - vars.add(tmp); - } - - return vars; - } - - private static ArrayList<DataIdentifier> parseDataIdentifiers( String in ) - { - ArrayList<DataIdentifier> vars = new ArrayList<>(); - StringTokenizer st = new StringTokenizer(in, ELEMENT_DELIM); - while( st.hasMoreTokens() ) { - String tmp = st.nextToken(); - DataIdentifier dat = parseDataIdentifier( tmp ); - vars.add(dat); - } - - return vars; - } - - private static DataIdentifier parseDataIdentifier( String in ) - { - StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM); - String name = st.nextToken(); - DataType dt = DataType.valueOf(st.nextToken()); - ValueType vt = ValueType.valueOf(st.nextToken()); - - DataIdentifier dat = new DataIdentifier(name); - dat.setDataType(dt); - dat.setValueType(vt); - - return dat; - } - - /** - * NOTE: MRJobConfiguration cannot be used for the general case because program blocks and - * related symbol tables can be hierarchically structured. - * - * @param in data object as string - * @return array of objects - */ - public static Object[] parseDataObject(String in) { - Object[] ret = new Object[2]; - - StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM ); - String name = st.nextToken(); - DataType datatype = DataType.valueOf( st.nextToken() ); - ValueType valuetype = ValueType.valueOf( st.nextToken() ); - String valString = st.hasMoreTokens() ? st.nextToken() : ""; - Data dat = null; - switch( datatype ) - { - case SCALAR: - { - switch ( valuetype ) - { - case INT: - dat = new IntObject(Long.parseLong(valString)); - break; - case DOUBLE: - dat = new DoubleObject(Double.parseDouble(valString)); - break; - case BOOLEAN: - dat = new BooleanObject(Boolean.parseBoolean(valString)); - break; - case STRING: - dat = new StringObject(valString); - break; - default: - throw new DMLRuntimeException("Unable to parse valuetype "+valuetype); - } - break; - } - case MATRIX: - { - MatrixObject mo = new MatrixObject(valuetype,valString); - long rows = Long.parseLong( st.nextToken() ); - long cols = Long.parseLong( st.nextToken() ); - int brows = Integer.parseInt( st.nextToken() ); - int bcols = Integer.parseInt( st.nextToken() ); - long nnz = Long.parseLong( st.nextToken() ); - InputInfo iin = InputInfo.stringToInputInfo( st.nextToken() ); - OutputInfo oin = OutputInfo.stringToOutputInfo( st.nextToken() ); - PartitionFormat partFormat = PartitionFormat.valueOf( st.nextToken() ); - UpdateType inplace = UpdateType.valueOf( st.nextToken() ); - MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, brows, bcols, nnz); - MetaDataFormat md = new MetaDataFormat( mc, oin, iin ); - mo.setMetaData( md ); - if( partFormat._dpf != PDataPartitionFormat.NONE ) - mo.setPartitioned( partFormat._dpf, partFormat._N ); - mo.setUpdateType(inplace); - dat = mo; - break; - } - default: - throw new DMLRuntimeException("Unable to parse datatype "+datatype); - } - - ret[0] = name; - ret[1] = dat; - return ret; - } - - private static ExecutionContext parseExecutionContext(String in, Program prog) - { - ExecutionContext ec = null; - - String lin = in.substring(PARFOR_EC_BEGIN.length(),in.length()-PARFOR_EC_END.length()).trim(); - - if( !lin.equals( EMPTY ) ) - { - LocalVariableMap vars = parseVariables(lin); - ec = ExecutionContextFactory.createContext( false, prog ); - ec.setVariables(vars); - } - - return ec; - } - - private static void parseAndSetAdditionalConfigurations(String conf) { - String[] statsFlag = conf.split("="); - DMLScript.STATISTICS = Boolean.parseBoolean(statsFlag[1]); - } - - ////////// - // CUSTOM SAFE LITERAL REPLACEMENT - - /** - * In-place replacement of thread ids in filenames, functions names etc - * - * @param inst instruction - * @param pattern ? - * @param replacement string replacement - * @return instruction - */ - private static Instruction saveReplaceThreadID( Instruction inst, String pattern, String replacement ) - { - //currently known, relevant instructions: createvar, rand, seq, extfunct, - if( inst instanceof MRJobInstruction ) - { - //update dims file, and internal string representations of rand/seq instructions - MRJobInstruction mrinst = (MRJobInstruction)inst; - mrinst.updateInstructionThreadID(pattern, replacement); - } - else if ( inst instanceof VariableCPInstruction ) //createvar, setfilename - { - //update in-memory representation - inst.updateInstructionThreadID(pattern, replacement); - } - //NOTE> //Rand, seq in CP not required - //else if( inst.toString().contains(pattern) ) - // throw new DMLRuntimeException( "DEBUG: Missed thread id replacement: "+inst ); - - return inst; - } - - public static String saveReplaceFilenameThreadID(String fname, String pattern, String replace) - { - //save replace necessary in order to account for the possibility that read variables have our prefix in the absolute path - //replace the last match only, because (1) we have at most one _t0 and (2) always concatenated to the end. - int pos = fname.lastIndexOf(pattern); - if( pos < 0 ) - return fname; - return fname.substring(0, pos) + replace + fname.substring(pos+pattern.length()); - } - - - ////////// - // CUSTOM HIERARCHICAL TOKENIZER - - - /** - * Custom StringTokenizer for splitting strings of hierarchies. The basic idea is to - * search for delim-Strings on the same hierarchy level, while delims of lower hierarchy - * levels are skipped. - * - */ - private static class HierarchyAwareStringTokenizer //extends StringTokenizer - { - private String _str = null; - private String _del = null; - private int _off = -1; - - public HierarchyAwareStringTokenizer( String in, String delim ) - { - //super(in); - _str = in; - _del = delim; - _off = delim.length(); - } - - public boolean hasMoreTokens() - { - return (_str.length() > 0); - } - - public String nextToken() - { - int nextDelim = determineNextSameLevelIndexOf(_str, _del); - String token = null; - if(nextDelim < 0) - { - nextDelim = _str.length(); - _off = 0; - } - token = _str.substring(0,nextDelim); - _str = _str.substring( nextDelim + _off ); - return token; - } - - private static int determineNextSameLevelIndexOf( String data, String pattern ) - { - String tmpdata = data; - int index = 0; - int count = 0; - int off=0,i1,i2,i3,min; - - while(true) - { - i1 = tmpdata.indexOf(pattern); - i2 = tmpdata.indexOf(LEVELIN); - i3 = tmpdata.indexOf(LEVELOUT); - - if( i1 < 0 ) return i1; //no pattern found at all - - min = i1; //min >= 0 by definition - if( i2 >= 0 ) min = Math.min(min, i2); - if( i3 >= 0 ) min = Math.min(min, i3); - - //stack maintenance - if( i1 == min && count == 0 ) - return index+i1; - else if( i2 == min ) - { - count++; - off = LEVELIN.length(); - } - else if( i3 == min ) - { - count--; - off = LEVELOUT.length(); - } - - //prune investigated string - index += min+off; - tmpdata = tmpdata.substring(min+off); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java index b51df84..1f25032 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java @@ -57,6 +57,7 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; import org.apache.sysml.runtime.util.MapReduceTool; +import org.apache.sysml.runtime.util.ProgramConverter; import org.apache.sysml.utils.Statistics; import org.apache.sysml.yarn.DMLAppMasterUtils; http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java index 885b2b7..66b4283 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java @@ -37,14 +37,13 @@ import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; -import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler; import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock; import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell; import org.apache.sysml.runtime.instructions.cp.IntObject; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; -import org.apache.sysml.runtime.util.LocalFileUtils; +import org.apache.sysml.runtime.util.ProgramConverter; import scala.Tuple2; @@ -151,23 +150,9 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF _numTasks = 0; _numIters = 0; - //init and register-cleanup of buffer pool (in parfor spark, multiple tasks might - //share the process-local, i.e., per executor, buffer pool; hence we synchronize - //the initialization and immediately register the created directory for cleanup - //on process exit, i.e., executor exit, including any files created in the future. - synchronized( CacheableData.class ) { - if( !CacheableData.isCachingActive() && !InfrastructureAnalyzer.isLocalMode() ) { - //create id, executor working dir, and cache dir - String uuid = IDHandler.createDistributedUniqueID(); - LocalFileUtils.createWorkingDirectoryWithUUID( uuid ); - CacheableData.initCaching( uuid ); //incl activation and cache dir creation - CacheableData.cacheEvictionLocalFilePrefix = - CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; - //register entire working dir for delete on shutdown - RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown(); - } - } - + //setup the buffer pool + RemoteParForUtils.setupBufferPool(_workerID); + //ensure that resultvar files are not removed super.pinResultVariables(); http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java index 6086d25..3f235cc 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java @@ -48,6 +48,7 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; import org.apache.sysml.runtime.util.LocalFileUtils; +import org.apache.sysml.runtime.util.ProgramConverter; import org.apache.sysml.utils.Statistics; public class RemoteDPParWorkerReducer extends ParWorker http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java index baec5a2..81a5e65 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java @@ -54,6 +54,7 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; import org.apache.sysml.runtime.util.MapReduceTool; +import org.apache.sysml.runtime.util.ProgramConverter; import org.apache.sysml.utils.Statistics; import org.apache.sysml.yarn.DMLAppMasterUtils; http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java index ab29148..fd0b9eb 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java @@ -37,8 +37,7 @@ import org.apache.sysml.runtime.codegen.CodegenUtils; import org.apache.sysml.runtime.controlprogram.caching.CacheBlock; import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; -import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler; -import org.apache.sysml.runtime.util.LocalFileUtils; +import org.apache.sysml.runtime.util.ProgramConverter; import org.apache.sysml.runtime.util.UtilFunctions; import scala.Tuple2; @@ -128,23 +127,9 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun .map(v -> v._name).collect(Collectors.toList()), _ec.getVarListPartitioned()); reuseVars.reuseVariables(_jobid, _ec.getVariables(), blacklist, _brInputs, _cleanCache); - //init and register-cleanup of buffer pool (in parfor spark, multiple tasks might - //share the process-local, i.e., per executor, buffer pool; hence we synchronize - //the initialization and immediately register the created directory for cleanup - //on process exit, i.e., executor exit, including any files created in the future. - synchronized( CacheableData.class ) { - if( !CacheableData.isCachingActive() && !InfrastructureAnalyzer.isLocalMode() ) { - //create id, executor working dir, and cache dir - String uuid = IDHandler.createDistributedUniqueID(); - LocalFileUtils.createWorkingDirectoryWithUUID( uuid ); - CacheableData.initCaching( uuid ); //incl activation and cache dir creation - CacheableData.cacheEvictionLocalFilePrefix = - CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; - //register entire working dir for delete on shutdown - RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown(); - } - } - + //setup the buffer pool + RemoteParForUtils.setupBufferPool(_workerID); + //ensure that resultvar files are not removed super.pinResultVariables(); http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java index fb2adc2..2cf35c7 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java @@ -45,8 +45,10 @@ import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat; +import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler; import org.apache.sysml.runtime.instructions.cp.Data; import org.apache.sysml.runtime.util.LocalFileUtils; +import org.apache.sysml.runtime.util.ProgramConverter; import org.apache.sysml.utils.Statistics; /** @@ -237,6 +239,29 @@ public class RemoteParForUtils //create return array return tmp.values().toArray(new LocalVariableMap[0]); } + + /** + * Init and register-cleanup of buffer pool + * @param workerID worker id + * @throws IOException exception + */ + public static void setupBufferPool(long workerID) throws IOException { + //init and register-cleanup of buffer pool (in spark, multiple tasks might + //share the process-local, i.e., per executor, buffer pool; hence we synchronize + //the initialization and immediately register the created directory for cleanup + //on process exit, i.e., executor exit, including any files created in the future. + synchronized(CacheableData.class) { + if (!CacheableData.isCachingActive() && !InfrastructureAnalyzer.isLocalMode()) { + //create id, executor working dir, and cache dir + String uuid = IDHandler.createDistributedUniqueID(); + LocalFileUtils.createWorkingDirectoryWithUUID(uuid); + CacheableData.initCaching(uuid); //incl activation and cache dir creation + CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix + "_" + workerID; + //register entire working dir for delete on shutdown + RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown(); + } + } + } /** * Task to be registered as shutdown hook in order to delete the http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java index 7029061..0bdb92e 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java @@ -42,6 +42,7 @@ import org.apache.sysml.runtime.instructions.cp.Data; import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; import org.apache.sysml.runtime.util.LocalFileUtils; +import org.apache.sysml.runtime.util.ProgramConverter; import org.apache.sysml.utils.Statistics; /** http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java index ed3b758..d345d01 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java @@ -81,7 +81,7 @@ import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter; +import org.apache.sysml.runtime.util.ProgramConverter; import org.apache.sysml.runtime.controlprogram.parfor.ResultMergeLocalFile; import org.apache.sysml.runtime.controlprogram.parfor.opt.CostEstimator.ExcludeType; import org.apache.sysml.runtime.controlprogram.parfor.opt.CostEstimator.TestMeasure; @@ -438,7 +438,7 @@ public class OptimizerRuleBased extends Optimizer _numEvaluatedPlans++; LOG.debug(getOptMode()+" OPT: rewrite 'set data partitioner' - result="+pdp.toString()+ - " ("+ProgramConverter.serializeStringCollection(partitionedMatrices.keySet())+")" ); + " ("+Arrays.toString(partitionedMatrices.keySet().toArray())+")" ); return blockwise; } @@ -1809,8 +1809,8 @@ public class OptimizerRuleBased extends Optimizer pfpb.setRuntimePiggybacking(apply); _numEvaluatedPlans++; - LOG.debug(getOptMode()+" OPT: rewrite 'enable runtime piggybacking' - result="+apply+ - " ("+ProgramConverter.serializeStringCollection(sharedVars)+")" ); + LOG.debug(getOptMode()+" OPT: rewrite 'enable runtime piggybacking' - result=" + +apply+" ("+Arrays.toString(sharedVars.toArray())+")" ); } protected boolean rHasSharedMRInput( OptNode n, Set<String> inputVars, Set<String> partitionedVars, HashSet<String> sharedVars ) @@ -1931,8 +1931,8 @@ public class OptimizerRuleBased extends Optimizer } _numEvaluatedPlans++; - LOG.debug(getOptMode()+" OPT: rewrite 'inject spark input repartition' - result="+ret.size()+ - " ("+ProgramConverter.serializeStringCollection(ret)+")" ); + LOG.debug(getOptMode()+" OPT: rewrite 'inject spark input repartition' - result=" + +ret.size()+" ("+Arrays.toString(ret.toArray())+")" ); } private void rCollectZipmmPartitioningCandidates( OptNode n, HashSet<String> cand ) @@ -2008,8 +2008,8 @@ public class OptimizerRuleBased extends Optimizer } _numEvaluatedPlans++; - LOG.debug(getOptMode()+" OPT: rewrite 'set spark eager rdd caching' - result="+ret.size()+ - " ("+ProgramConverter.serializeStringCollection(ret)+")" ); + LOG.debug(getOptMode()+" OPT: rewrite 'set spark eager rdd caching' - result=" + +ret.size()+" ("+Arrays.toString(ret.toArray())+")" ); } /////// http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/instructions/MRJobInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/MRJobInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/MRJobInstruction.java index 65e9d3a..9d6f133 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/MRJobInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/MRJobInstruction.java @@ -36,7 +36,7 @@ import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartition import org.apache.sysml.runtime.controlprogram.caching.FrameObject; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; -import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter; +import org.apache.sysml.runtime.util.ProgramConverter; import org.apache.sysml.runtime.instructions.cp.Data; import org.apache.sysml.runtime.matrix.JobReturn; import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
