Repository: systemml Updated Branches: refs/heads/master d71a7d359 -> a5c834b27
[SYSTEMML-1727] Fix invalid mvvar instruction generation for pwrites This patch fixes the generation of runtime instructions (piggybacking) by removing the generation of mvvar instructions for persistent writes that are the only consumer of a transient reads and write in format binary block. Mvvar (move variable) instructions are invalid here for two reasons. First, in case of different file URI schemes, the mvvar cannot be realized as a rename (meta data operation) but reads the matrix into CP, which leads to OOMs for large data. Second, in case of a subsequent use of the variable, the temporary file does no longer exist, which potentially leads to file not found exceptions. This patch fixes these issues by globally disabling the generation of mvvar instructions for persistent writes. A more fine-grained handling would be possible but would require a major rewrite of how we currently generate instructions. This seems unnecessary, given that in CP and Spark intermediates rarely reside on HDFS anyway. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/9e7ce7ba Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/9e7ce7ba Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/9e7ce7ba Branch: refs/heads/master Commit: 9e7ce7ba3fd2871ac9da2f06007b31f3780d37de Parents: d71a7d3 Author: Matthias Boehm <[email protected]> Authored: Tue Jun 20 19:41:37 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Tue Jun 20 19:41:37 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/sysml/lops/compile/Dag.java | 129 ++----------------- 1 file changed, 13 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/9e7ce7ba/src/main/java/org/apache/sysml/lops/compile/Dag.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/compile/Dag.java b/src/main/java/org/apache/sysml/lops/compile/Dag.java index ff83671..f108d2f 100644 --- a/src/main/java/org/apache/sysml/lops/compile/Dag.java +++ b/src/main/java/org/apache/sysml/lops/compile/Dag.java @@ -364,10 +364,6 @@ public class Dag<N extends Lop> // (currently required for specific cases of external functions) for (String varName : sb.liveIn().getVariableNames()) { if (!sb.liveOut().containsVariable(varName)) { - // DataType dt = in.getVariable(varName).getDataType(); - // if( !(dt==DataType.MATRIX || dt==DataType.UNKNOWN) ) - // continue; //skip rm instructions for non-matrix objects - inst = VariableCPInstruction.prepareRemoveInstruction(varName); inst.setLocation(sb.getEndLine(), sb.getEndLine(), -1, -1); @@ -377,26 +373,6 @@ public class Dag<N extends Lop> LOG.trace(" Adding " + inst.toString()); } } - - // RULE 2: if in KILL and not in IN and not in OUT, then there should be an rmvar or rmfilevar inst - // (currently required for specific cases of nested loops) - // i.e., local variables which are created within the block, and used entirely within the block - /*for (String varName : sb.getKill().getVariableNames()) { - if ((!sb.liveIn().containsVariable(varName)) - && (!sb.liveOut().containsVariable(varName))) { - // DataType dt = - // sb.getKill().getVariable(varName).getDataType(); - // if( !(dt==DataType.MATRIX || dt==DataType.UNKNOWN) ) - // continue; //skip rm instructions for non-matrix objects - - inst = createCleanupInstruction(varName); - deleteInst.add(inst); - - if (DMLScript.DEBUG) - System.out.println("Adding instruction (r2) " - + inst.toString()); - } - }*/ } private static ArrayList<ArrayList<Lop>> createNodeVectors(int size) { @@ -709,20 +685,10 @@ public class Dag<N extends Lop> Lop in = node.getInputs().get(0); Format nodeFormat = node.getOutputParameters().getFormat(); - // Case of a transient read feeding into only one output persistent binaryblock write - // Move the temporary file on HDFS to required persistent location, insteadof copying. - if ( in.getExecLocation() == ExecLocation.Data && in.getOutputs().size() == 1 - && !((Data)node).isTransient() - && ((Data)in).isTransient() - && ((Data)in).getOutputParameters().isBlocked() - && node.getOutputParameters().isBlocked() ) { - return false; - } - //send write lop to MR if (1) it is marked with exec type MR (based on its memory estimate), or //(2) if the input lop is in MR and the write format allows to pack it into the same job (this does //not apply to csv write because MR csvwrite is a separate MR job type) - return (node.getExecType() == ExecType.MR + return (node.getExecType() == ExecType.MR || (in.getExecType() == ExecType.MR && nodeFormat != Format.CSV)); } @@ -986,7 +952,6 @@ public class Dag<N extends Lop> } else { // nothing here.. subsequent checks have to be performed // on "node" - ; } } @@ -1022,7 +987,6 @@ public class Dag<N extends Lop> && ((Data)input).isTransient() && dnode.getOutputParameters().getLabel().equals(input.getOutputParameters().getLabel()) ) { // do nothing, <code>node</code> must not processed any further. - ; } else if ( execNodes.contains(input) && !isCompatible(node, input) && sendWriteLopToMR(node)) { // input is in execNodes but it is not compatible with write lop. So, queue the write lop. @@ -1101,7 +1065,6 @@ public class Dag<N extends Lop> // Limit the number of distributed cache inputs based on the available memory in mappers double memsize = computeFootprintInMapper(node); - //gmrMapperFootprint += computeFootprintInMapper(node); if ( gmrMapperFootprint>0 && !checkMemoryLimits(node, gmrMapperFootprint+memsize ) ) { queueThisNode = true; subcode = 2; @@ -1196,10 +1159,7 @@ public class Dag<N extends Lop> if ( execNodes.isEmpty() ) { if( !queuedNodes.isEmpty() ) - { - //System.err.println("Queued nodes should be 0"); - throw new LopsException("Queued nodes should not be 0 at this point \n"); - } + throw new LopsException("Queued nodes should not be 0 at this point \n"); if( LOG.isTraceEnabled() ) LOG.trace("All done! queuedNodes = "+ queuedNodes.size()); @@ -1264,7 +1224,6 @@ public class Dag<N extends Lop> // we should not consider such lops in this check if (isChild(tmpNode, node, IDMap) && tmpNode.getExecLocation() != ExecLocation.ControlProgram - //&& tmpNode.getCompatibleJobs() != LopProperties.INVALID && (tmpNode.getCompatibleJobs() & node.getCompatibleJobs()) == 0) return false; } @@ -1278,7 +1237,6 @@ public class Dag<N extends Lop> * @param deleteInst list of instructions */ private static void excludeRemoveInstruction(String varName, ArrayList<Instruction> deleteInst) { - //for(Instruction inst : deleteInst) { for(int i=0; i < deleteInst.size(); i++) { Instruction inst = deleteInst.get(i); if ((inst.getType() == INSTRUCTION_TYPE.CONTROL_PROGRAM || inst.getType() == INSTRUCTION_TYPE.SPARK) @@ -1525,7 +1483,6 @@ public class Dag<N extends Lop> markedNodes.add(node); doRmVar = true; - //continue; } else if (node.getExecLocation() == ExecLocation.Data ) { Data dnode = (Data)node; @@ -1543,13 +1500,11 @@ public class Dag<N extends Lop> if ( dnode.getDataType() == DataType.SCALAR ) { // processing is same for both transient and persistent scalar writes writeInst.addAll(out.getLastInstructions()); - //inst.addAll(out.getLastInstructions()); doRmVar = false; } else { // setupNodeOutputs() handles both transient and persistent matrix writes if ( dnode.isTransient() ) { - //inst.addAll(out.getPreInstructions()); // dummy ? deleteInst.addAll(out.getLastInstructions()); doRmVar = false; } @@ -1563,7 +1518,6 @@ public class Dag<N extends Lop> } } markedNodes.add(node); - //continue; } } else { @@ -1586,7 +1540,6 @@ public class Dag<N extends Lop> } markedNodes.add(node); doRmVar = true; - //continue; } } @@ -1662,8 +1615,7 @@ public class Dag<N extends Lop> // Evaluate only those lops that execute in MR. boolean unassigned_inputs = false; for( Lop input : node.getInputs() ) { - //if ( input.getExecLocation() != ExecLocation.ControlProgram && jobType(input, jobvec) == -1 ) { - if ( input.getExecType() == ExecType.MR && !execNodes.contains(input)) { //jobType(input, jobvec) == -1 ) { + if ( input.getExecType() == ExecType.MR && !execNodes.contains(input)) { unassigned_inputs = true; break; } @@ -1758,9 +1710,6 @@ public class Dag<N extends Lop> * MapAndReduce (say GMR) -- Inputs coming from two different * jobs .. GMR & REBLOCK */ - //boolean himr = hasOtherMapAndReduceParentNode(tmpNode, execNodes,node); - //boolean bcbp = branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, finishedNodes); - //System.out.println(" .. " + inputs_in_same_job + "," + himr + "," + bcbp); if ((inputs_in_same_job || unassigned_inputs) && node.getExecLocation() == ExecLocation.MapAndReduce && !hasOtherMapAndReduceParentNode(tmpNode, execNodes,node) // don't remove since it already piggybacked with a MapReduce node @@ -1771,7 +1720,7 @@ public class Dag<N extends Lop> markedNodes.add(tmpNode); } - } // for i + } // we also need to delete all parent nodes of marked nodes for ( Lop enode : execNodes ) { @@ -2530,9 +2479,6 @@ public class Dag<N extends Lop> String tempVarName = oparams.getLabel() + "temp"; String tempFileName = getNextUniqueFilename(); - //String createInst = prepareVariableInstruction("createvar", tempVarName, node.getDataType(), node.getValueType(), tempFileName, oparams, out.getOutInfo()); - //out.addPreInstruction(CPInstructionParser.parseSingleInstruction(createInst)); - int rpb = (int) oparams.getRowsInBlock(); int cpb = (int) oparams.getColsInBlock(); Instruction createvarInst = VariableCPInstruction.prepareCreateVariableInstruction( @@ -2570,18 +2516,6 @@ public class Dag<N extends Lop> * tVarA -> temp21tVarA */ - // rename the temp variable to constant variable (e.g., cpvar tVarAtemp tVarA) - /*Instruction currInstr = VariableCPInstruction.prepareCopyInstruction(tempVarName, constVarName); - if(DMLScript.ENABLE_DEBUG_MODE) { - currInstr.setLineNum(node._beginLine); - } - out.addLastInstruction(currInstr); - Instruction tempInstr = VariableCPInstruction.prepareRemoveInstruction(tempVarName); - if(DMLScript.ENABLE_DEBUG_MODE) { - tempInstr.setLineNum(node._beginLine); - } - out.addLastInstruction(tempInstr);*/ - // Generate a single mvvar instruction (e.g., mvvar tempA A) // instead of two instructions "cpvar tempA A" and "rmvar tempA" Instruction currInstr = VariableCPInstruction.prepareMoveInstruction(tempVarName, constVarName); @@ -2601,9 +2535,6 @@ public class Dag<N extends Lop> // create a variable to hold the result produced by this "rootNode" oparams.setLabel("pVar" + var_index.getNextID() ); - //String createInst = prepareVariableInstruction("createvar", node); - //out.addPreInstruction(CPInstructionParser.parseSingleInstruction(createInst)); - int rpb = (int) oparams.getRowsInBlock(); int cpb = (int) oparams.getColsInBlock(); Lop fnameLop = ((Data)node).getNamedInputLop(DataExpression.IO_FILENAME); @@ -2702,47 +2633,15 @@ public class Dag<N extends Lop> else { //CP PERSISTENT WRITE // generate a write instruction that writes matrix to HDFS Lop fname = ((Data)node).getNamedInputLop(DataExpression.IO_FILENAME); - Instruction currInstr = null; - Lop inputLop = node.getInputs().get(0); - - // Case of a transient read feeding into only one output persistent binaryblock write - // Move the temporary file on HDFS to required persistent location, insteadof copying. - if (inputLop.getExecLocation() == ExecLocation.Data - && inputLop.getOutputs().size() == 1 - && ((Data)inputLop).isTransient() - && ((Data)inputLop).getOutputParameters().isBlocked() - && node.getOutputParameters().isBlocked() ) { - // transient read feeding into persistent write in blocked representation - // simply, move the file - - //prepare filename (literal or variable in order to support dynamic write) - String fnameStr = (fname instanceof Data && ((Data)fname).isLiteral()) ? - fname.getOutputParameters().getLabel() - : Lop.VARIABLE_NAME_PLACEHOLDER + fname.getOutputParameters().getLabel() + Lop.VARIABLE_NAME_PLACEHOLDER; - - currInstr = (CPInstruction) VariableCPInstruction.prepareMoveInstruction( - inputLop.getOutputParameters().getLabel(), - fnameStr, "binaryblock" ); - } - else { - - String io_inst = node.getInstructions( - node.getInputs().get(0).getOutputParameters().getLabel(), - fname.getOutputParameters().getLabel()); - - if(node.getExecType() == ExecType.SPARK) - // This will throw an exception if the exectype of hop is set incorrectly - // Note: the exec type and exec location of lops needs to be set to SPARK and ControlProgram respectively - currInstr = SPInstructionParser.parseSingleInstruction(io_inst); - else - currInstr = CPInstructionParser.parseSingleInstruction(io_inst); - } - - if ( !node.getInputs().isEmpty() && node.getInputs().get(0)._beginLine != 0) - currInstr.setLocation(node.getInputs().get(0)); - else - currInstr.setLocation(node); + String io_inst = node.getInstructions( + node.getInputs().get(0).getOutputParameters().getLabel(), + fname.getOutputParameters().getLabel()); + Instruction currInstr = (node.getExecType() == ExecType.SPARK) ? + SPInstructionParser.parseSingleInstruction(io_inst) : + CPInstructionParser.parseSingleInstruction(io_inst); + currInstr.setLocation((!node.getInputs().isEmpty() + && node.getInputs().get(0)._beginLine != 0) ? node.getInputs().get(0) : node); out.addLastInstruction(currInstr); } @@ -3439,11 +3338,9 @@ public class Dag<N extends Lop> // cannot reuse index if this is true // need to add better indexing schemes - // if (child_for_max_input_index.getOutputs().size() > 1) { output_index = start_index[0]; start_index[0]++; - // } - + nodeIndexMapping.put(node, output_index); // populate list of input labels.
