Repository: systemml
Updated Branches:
  refs/heads/master d71a7d359 -> a5c834b27


[SYSTEMML-1727] Fix invalid mvvar instruction generation for pwrites

This patch fixes the generation of runtime instructions (piggybacking)
by removing the generation of mvvar instructions for persistent writes
that are the only consumer of a transient reads and write in format
binary block. Mvvar (move variable) instructions are invalid here for
two reasons. First, in case of different file URI schemes, the mvvar
cannot be realized as a rename (meta data operation) but reads the
matrix into CP, which leads to OOMs for large data. Second, in case of a
subsequent use of the variable, the temporary file does no longer exist,
which potentially leads to file not found exceptions. 

This patch fixes these issues by globally disabling the generation of
mvvar instructions for persistent writes. A more fine-grained handling
would be possible but would require a major rewrite of how we currently
generate instructions. This seems unnecessary, given that in CP and
Spark intermediates rarely reside on HDFS anyway.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/9e7ce7ba
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/9e7ce7ba
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/9e7ce7ba

Branch: refs/heads/master
Commit: 9e7ce7ba3fd2871ac9da2f06007b31f3780d37de
Parents: d71a7d3
Author: Matthias Boehm <[email protected]>
Authored: Tue Jun 20 19:41:37 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Tue Jun 20 19:41:37 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/sysml/lops/compile/Dag.java | 129 ++-----------------
 1 file changed, 13 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/9e7ce7ba/src/main/java/org/apache/sysml/lops/compile/Dag.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/compile/Dag.java 
b/src/main/java/org/apache/sysml/lops/compile/Dag.java
index ff83671..f108d2f 100644
--- a/src/main/java/org/apache/sysml/lops/compile/Dag.java
+++ b/src/main/java/org/apache/sysml/lops/compile/Dag.java
@@ -364,10 +364,6 @@ public class Dag<N extends Lop>
                // (currently required for specific cases of external functions)
                for (String varName : sb.liveIn().getVariableNames()) {
                        if (!sb.liveOut().containsVariable(varName)) {
-                               // DataType dt = 
in.getVariable(varName).getDataType();
-                               // if( !(dt==DataType.MATRIX || 
dt==DataType.UNKNOWN) )
-                               // continue; //skip rm instructions for 
non-matrix objects
-
                                inst = 
VariableCPInstruction.prepareRemoveInstruction(varName);
                                inst.setLocation(sb.getEndLine(), 
sb.getEndLine(), -1, -1);
                                
@@ -377,26 +373,6 @@ public class Dag<N extends Lop>
                                        LOG.trace("  Adding " + 
inst.toString());
                        }
                }
-
-               // RULE 2: if in KILL and not in IN and not in OUT, then there 
should be an rmvar or rmfilevar inst
-               // (currently required for specific cases of nested loops)
-               // i.e., local variables which are created within the block, 
and used entirely within the block 
-               /*for (String varName : sb.getKill().getVariableNames()) {
-                       if ((!sb.liveIn().containsVariable(varName))
-                                       && 
(!sb.liveOut().containsVariable(varName))) {
-                               // DataType dt =
-                               // 
sb.getKill().getVariable(varName).getDataType();
-                               // if( !(dt==DataType.MATRIX || 
dt==DataType.UNKNOWN) )
-                               // continue; //skip rm instructions for 
non-matrix objects
-
-                               inst = createCleanupInstruction(varName);
-                               deleteInst.add(inst);
-
-                               if (DMLScript.DEBUG)
-                                       System.out.println("Adding instruction 
(r2) "
-                                                       + inst.toString());
-                       }
-               }*/
        }
 
        private static ArrayList<ArrayList<Lop>> createNodeVectors(int size) {
@@ -709,20 +685,10 @@ public class Dag<N extends Lop>
                Lop in = node.getInputs().get(0);
                Format nodeFormat = node.getOutputParameters().getFormat();
                
-               // Case of a transient read feeding into only one output 
persistent binaryblock write
-               // Move the temporary file on HDFS to required persistent 
location, insteadof copying.
-               if ( in.getExecLocation() == ExecLocation.Data && 
in.getOutputs().size() == 1
-                               && !((Data)node).isTransient()
-                               && ((Data)in).isTransient()
-                               && ((Data)in).getOutputParameters().isBlocked()
-                               && node.getOutputParameters().isBlocked() ) {
-                       return false;
-               }
-               
                //send write lop to MR if (1) it is marked with exec type MR 
(based on its memory estimate), or
                //(2) if the input lop is in MR and the write format allows to 
pack it into the same job (this does
                //not apply to csv write because MR csvwrite is a separate MR 
job type)
-               return (node.getExecType() == ExecType.MR 
+               return (node.getExecType() == ExecType.MR
                        || (in.getExecType() == ExecType.MR && nodeFormat != 
Format.CSV));
        }
        
@@ -986,7 +952,6 @@ public class Dag<N extends Lop>
                                        } else {
                                                // nothing here.. subsequent 
checks have to be performed
                                                // on "node"
-                                               ;
                                        }
                                }
 
@@ -1022,7 +987,6 @@ public class Dag<N extends Lop>
                                                                && 
((Data)input).isTransient() 
                                                                && 
dnode.getOutputParameters().getLabel().equals(input.getOutputParameters().getLabel())
 ) {
                                                        // do nothing, 
<code>node</code> must not processed any further.
-                                                       ;
                                                }
                                                else if ( 
execNodes.contains(input) && !isCompatible(node, input) && 
sendWriteLopToMR(node)) {
                                                        // input is in 
execNodes but it is not compatible with write lop. So, queue the write lop.
@@ -1101,7 +1065,6 @@ public class Dag<N extends Lop>
                                                
                                                // Limit the number of 
distributed cache inputs based on the available memory in mappers
                                                double memsize = 
computeFootprintInMapper(node);
-                                               //gmrMapperFootprint += 
computeFootprintInMapper(node);
                                                if ( gmrMapperFootprint>0 && 
!checkMemoryLimits(node, gmrMapperFootprint+memsize ) ) {
                                                        queueThisNode = true;
                                                        subcode = 2;
@@ -1196,10 +1159,7 @@ public class Dag<N extends Lop>
                        if ( execNodes.isEmpty() ) {
                          
                          if( !queuedNodes.isEmpty() )
-                         {
-                             //System.err.println("Queued nodes should be 0");
-                             throw new LopsException("Queued nodes should not 
be 0 at this point \n");
-                         }
+                                 throw new LopsException("Queued nodes should 
not be 0 at this point \n");
                          
                          if( LOG.isTraceEnabled() )
                                LOG.trace("All done! queuedNodes = "+ 
queuedNodes.size());
@@ -1264,7 +1224,6 @@ public class Dag<N extends Lop>
            // we should not consider such lops in this check
            if (isChild(tmpNode, node, IDMap) 
                        && tmpNode.getExecLocation() != 
ExecLocation.ControlProgram
-                       //&& tmpNode.getCompatibleJobs() != 
LopProperties.INVALID 
                        && (tmpNode.getCompatibleJobs() & 
node.getCompatibleJobs()) == 0)
              return false;
          }
@@ -1278,7 +1237,6 @@ public class Dag<N extends Lop>
         * @param deleteInst list of instructions
         */
        private static void excludeRemoveInstruction(String varName, 
ArrayList<Instruction> deleteInst) {
-               //for(Instruction inst : deleteInst) {
                for(int i=0; i < deleteInst.size(); i++) {
                        Instruction inst = deleteInst.get(i);
                        if ((inst.getType() == INSTRUCTION_TYPE.CONTROL_PROGRAM 
 || inst.getType() == INSTRUCTION_TYPE.SPARK)
@@ -1525,7 +1483,6 @@ public class Dag<N extends Lop>
 
                                markedNodes.add(node);
                                doRmVar = true;
-                               //continue;
                        }
                        else if (node.getExecLocation() == ExecLocation.Data ) {
                                Data dnode = (Data)node;
@@ -1543,13 +1500,11 @@ public class Dag<N extends Lop>
                                                if ( dnode.getDataType() == 
DataType.SCALAR ) {
                                                        // processing is same 
for both transient and persistent scalar writes 
                                                        
writeInst.addAll(out.getLastInstructions());
-                                                       
//inst.addAll(out.getLastInstructions());
                                                        doRmVar = false;
                                                }
                                                else {
                                                        // setupNodeOutputs() 
handles both transient and persistent matrix writes 
                                                        if ( 
dnode.isTransient() ) {
-                                                               
//inst.addAll(out.getPreInstructions()); // dummy ?
                                                                
deleteInst.addAll(out.getLastInstructions());
                                                                doRmVar = false;
                                                        }
@@ -1563,7 +1518,6 @@ public class Dag<N extends Lop>
                                                        }
                                                }
                                                markedNodes.add(node);
-                                               //continue;
                                        }
                                }
                                else {
@@ -1586,7 +1540,6 @@ public class Dag<N extends Lop>
                                        }
                                        markedNodes.add(node);
                                        doRmVar = true;
-                                       //continue;
                                }
                        }
                        
@@ -1662,8 +1615,7 @@ public class Dag<N extends Lop>
                // Evaluate only those lops that execute in MR.
                boolean unassigned_inputs = false;
                for( Lop input : node.getInputs() ) {
-                       //if ( input.getExecLocation() != 
ExecLocation.ControlProgram && jobType(input, jobvec) == -1 ) {
-                       if ( input.getExecType() == ExecType.MR && 
!execNodes.contains(input)) { //jobType(input, jobvec) == -1 ) {
+                       if ( input.getExecType() == ExecType.MR && 
!execNodes.contains(input)) { 
                                unassigned_inputs = true;
                                break;
                        }
@@ -1758,9 +1710,6 @@ public class Dag<N extends Lop>
                                 * MapAndReduce (say GMR) -- Inputs coming from 
two different
                                 * jobs .. GMR & REBLOCK
                                 */
-                               //boolean himr = 
hasOtherMapAndReduceParentNode(tmpNode, execNodes,node);
-                               //boolean bcbp = 
branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, finishedNodes);
-                               //System.out.println("      .. " + 
inputs_in_same_job + "," + himr + "," + bcbp);
                                if ((inputs_in_same_job || unassigned_inputs)
                                                && node.getExecLocation() == 
ExecLocation.MapAndReduce
                                                && 
!hasOtherMapAndReduceParentNode(tmpNode, execNodes,node)  // don't remove since 
it already piggybacked with a MapReduce node
@@ -1771,7 +1720,7 @@ public class Dag<N extends Lop>
 
                                        markedNodes.add(tmpNode);
                                }
-               } // for i
+               } 
 
                // we also need to delete all parent nodes of marked nodes
                for ( Lop enode : execNodes ) {
@@ -2530,9 +2479,6 @@ public class Dag<N extends Lop>
                                                String tempVarName = 
oparams.getLabel() + "temp";
                                                String tempFileName = 
getNextUniqueFilename();
                                                
-                                               //String createInst = 
prepareVariableInstruction("createvar", tempVarName, node.getDataType(), 
node.getValueType(), tempFileName, oparams, out.getOutInfo());
-                                               
//out.addPreInstruction(CPInstructionParser.parseSingleInstruction(createInst));
-                                               
                                                int rpb = (int) 
oparams.getRowsInBlock();
                                                int cpb = (int) 
oparams.getColsInBlock();
                                                Instruction createvarInst = 
VariableCPInstruction.prepareCreateVariableInstruction(
@@ -2570,18 +2516,6 @@ public class Dag<N extends Lop>
                                                 *     tVarA -> temp21tVarA
                                                 */
                                                
-                                               // rename the temp variable to 
constant variable (e.g., cpvar tVarAtemp tVarA)
-                                               /*Instruction currInstr = 
VariableCPInstruction.prepareCopyInstruction(tempVarName, constVarName);
-                                               if(DMLScript.ENABLE_DEBUG_MODE) 
{
-                                                       
currInstr.setLineNum(node._beginLine);
-                                               }
-                                               
out.addLastInstruction(currInstr);
-                                               Instruction tempInstr = 
VariableCPInstruction.prepareRemoveInstruction(tempVarName);
-                                               if(DMLScript.ENABLE_DEBUG_MODE) 
{
-                                                       
tempInstr.setLineNum(node._beginLine);
-                                               }
-                                               
out.addLastInstruction(tempInstr);*/
-
                                                // Generate a single mvvar 
instruction (e.g., mvvar tempA A) 
                                                //    instead of two 
instructions "cpvar tempA A" and "rmvar tempA"
                                                Instruction currInstr = 
VariableCPInstruction.prepareMoveInstruction(tempVarName, constVarName);
@@ -2601,9 +2535,6 @@ public class Dag<N extends Lop>
                                                // create a variable to hold 
the result produced by this "rootNode"
                                                oparams.setLabel("pVar" + 
var_index.getNextID() );
                                                
-                                               //String createInst = 
prepareVariableInstruction("createvar", node);
-                                               
//out.addPreInstruction(CPInstructionParser.parseSingleInstruction(createInst));
-
                                                int rpb = (int) 
oparams.getRowsInBlock();
                                                int cpb = (int) 
oparams.getColsInBlock();
                                                Lop fnameLop = 
((Data)node).getNamedInputLop(DataExpression.IO_FILENAME);
@@ -2702,47 +2633,15 @@ public class Dag<N extends Lop>
                                        else { //CP PERSISTENT WRITE
                                                // generate a write instruction 
that writes matrix to HDFS
                                                Lop fname = 
((Data)node).getNamedInputLop(DataExpression.IO_FILENAME);
-                                               Instruction currInstr = null;
-                                               Lop inputLop = 
node.getInputs().get(0);
-                                               
-                                               // Case of a transient read 
feeding into only one output persistent binaryblock write
-                                               // Move the temporary file on 
HDFS to required persistent location, insteadof copying.
-                                               if (inputLop.getExecLocation() 
== ExecLocation.Data
-                                                               && 
inputLop.getOutputs().size() == 1
-                                                               && 
((Data)inputLop).isTransient() 
-                                                               && 
((Data)inputLop).getOutputParameters().isBlocked()
-                                                               && 
node.getOutputParameters().isBlocked() ) {
-                                                       // transient read 
feeding into persistent write in blocked representation
-                                                       // simply, move the file
-                                                       
-                                                       //prepare filename 
(literal or variable in order to support dynamic write) 
-                                                       String fnameStr = 
(fname instanceof Data && ((Data)fname).isLiteral()) ? 
-                                                                               
   fname.getOutputParameters().getLabel() 
-                                                                               
   : Lop.VARIABLE_NAME_PLACEHOLDER + fname.getOutputParameters().getLabel() + 
Lop.VARIABLE_NAME_PLACEHOLDER;
-                                                       
-                                                       currInstr = 
(CPInstruction) VariableCPInstruction.prepareMoveInstruction(
-                                                                               
        inputLop.getOutputParameters().getLabel(), 
-                                                                               
        fnameStr, "binaryblock" );
-                                               }
-                                               else {
-                                                       
-                                                       String io_inst = 
node.getInstructions(
-                                                                       
node.getInputs().get(0).getOutputParameters().getLabel(), 
-                                                                       
fname.getOutputParameters().getLabel());
-                                                       
-                                                       if(node.getExecType() 
== ExecType.SPARK)
-                                                               // This will 
throw an exception if the exectype of hop is set incorrectly
-                                                               // Note: the 
exec type and exec location of lops needs to be set to SPARK and ControlProgram 
respectively
-                                                               currInstr = 
SPInstructionParser.parseSingleInstruction(io_inst);
-                                                       else
-                                                               currInstr = 
CPInstructionParser.parseSingleInstruction(io_inst);
-                                               }
-
                                                
-                                               if ( 
!node.getInputs().isEmpty() && node.getInputs().get(0)._beginLine != 0)
-                                                       
currInstr.setLocation(node.getInputs().get(0));
-                                               else
-                                                       
currInstr.setLocation(node);
+                                               String io_inst = 
node.getInstructions(
+                                                       
node.getInputs().get(0).getOutputParameters().getLabel(), 
+                                                       
fname.getOutputParameters().getLabel());
+                                               Instruction currInstr = 
(node.getExecType() == ExecType.SPARK) ?
+                                                       
SPInstructionParser.parseSingleInstruction(io_inst) :
+                                                       
CPInstructionParser.parseSingleInstruction(io_inst);
+                                               
currInstr.setLocation((!node.getInputs().isEmpty() 
+                                                       && 
node.getInputs().get(0)._beginLine != 0) ? node.getInputs().get(0) : node);
                                                
                                                
out.addLastInstruction(currInstr);
                                        }
@@ -3439,11 +3338,9 @@ public class Dag<N extends Lop>
 
                        // cannot reuse index if this is true
                        // need to add better indexing schemes
-                       // if (child_for_max_input_index.getOutputs().size() > 
1) {
                        output_index = start_index[0];
                        start_index[0]++;
-                       // }
-
+                       
                        nodeIndexMapping.put(node, output_index);
 
                        // populate list of input labels.

Reply via email to