Repository: systemml
Updated Branches:
  refs/heads/master e270960ca -> 55ce4853c


http://git-wip-us.apache.org/repos/asf/systemml/blob/55ce4853/src/main/java/org/apache/sysml/lops/compile/Dag.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/compile/Dag.java 
b/src/main/java/org/apache/sysml/lops/compile/Dag.java
index bc080d3..791fa01 100644
--- a/src/main/java/org/apache/sysml/lops/compile/Dag.java
+++ b/src/main/java/org/apache/sysml/lops/compile/Dag.java
@@ -21,10 +21,15 @@ package org.apache.sysml.lops.compile;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import org.apache.commons.logging.Log;
@@ -105,26 +110,21 @@ public class Dag<N extends Lop>
        private int total_reducers = -1;
        private String scratch = "";
        private String scratchFilePath = null;
+
+       // list of all nodes in the dag
+       private ArrayList<Lop> nodes = null;
        
+       // Hashmap to translates the nodes in the DAG to a sequence of numbers
+       //     key:   Lop ID
+       //     value: Sequence Number (0 ... |DAG|)
+       // This map is primarily used in performing DFS on the DAG, and 
subsequently in performing ancestor-descendant checks.
+       private HashMap<Long, Integer> IDMap = null;
        private double gmrMapperFootprint = 0;
-       
+
        static {
                job_id = new IDSequence();
                var_index = new IDSequence();
        }
-       
-       // list of all nodes in the dag
-       private ArrayList<Lop> nodes = null;
-       
-       /* 
-        * Hashmap to translates the nodes in the DAG to a sequence of numbers
-        *     key:   Lop ID
-        *     value: Sequence Number (0 ... |DAG|)
-        *     
-        * This map is primarily used in performing DFS on the DAG, and 
subsequently in performing ancestor-descendant checks.
-        */
-       private HashMap<Long, Integer> IDMap = null;
-
 
        private static class NodeOutput {
                String fileName;
@@ -196,9 +196,9 @@ public class Dag<N extends Lop>
        private String getFilePath() {
                if ( scratchFilePath == null ) {
                        scratchFilePath = scratch + Lop.FILE_SEPARATOR
-                                                               + 
Lop.PROCESS_PREFIX + DMLScript.getUUID()
-                                                               + 
Lop.FILE_SEPARATOR + Lop.FILE_SEPARATOR
-                                                               + 
ProgramConverter.CP_ROOT_THREAD_ID + Lop.FILE_SEPARATOR;
+                               + Lop.PROCESS_PREFIX + DMLScript.getUUID()
+                               + Lop.FILE_SEPARATOR + Lop.FILE_SEPARATOR
+                               + ProgramConverter.CP_ROOT_THREAD_ID + 
Lop.FILE_SEPARATOR;
                }
                return scratchFilePath;
        }
@@ -226,10 +226,9 @@ public class Dag<N extends Lop>
         * @param node low-level operator
         * @return true if node was not already present, false if not.
         */
-
        public boolean addNode(Lop node) {
                if (nodes.contains(node))
-                       return false;           
+                       return false;
                nodes.add(node);
                return true;
        }
@@ -249,696 +248,282 @@ public class Dag<N extends Lop>
                
                // create ordering of lops (for MR, we sort by level, while for 
all
                // other exec types we use a two-level sorting of )
-               ArrayList<Lop> node_v = OptimizerUtils.isHadoopExecutionMode() ?
+               List<Lop> node_v = OptimizerUtils.isHadoopExecutionMode() ?
                        doTopologicalSortStrictOrder(nodes) :
                        doTopologicalSortTwoLevelOrder(nodes);
                
                // do greedy grouping of operations
-               ArrayList<Instruction> inst = doGreedyGrouping(sb, node_v);
+               ArrayList<Instruction> inst = 
OptimizerUtils.isHadoopExecutionMode() ?
+                       doGreedyGrouping(sb, node_v) : 
doPlainInstructionGen(sb, node_v);
                
                // cleanup instruction (e.g., create packed rmvar instructions)
-               inst = cleanupInstructions(inst);
+               return cleanupInstructions(inst);
+       }
+       
+       
+       /**
+        * Sort the lops by topological order.
+        * 
+        *  1) All nodes with level i appear prior to the nodes in level i+1.
+        *  2) All nodes within a level are ordered by their ID i.e., in the 
order
+        *  they are created
+        * 
+        * @param v list of lops
+        */
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       private List<Lop> doTopologicalSortStrictOrder(List<Lop> v) {
+               /*
+                * Step 1: compute the level for each node in the DAG. Level 
for each node is 
+                *   computed as lops are created. So, this step is need not be 
performed here.
+                * Step 2: sort the nodes by level, and within a level by node 
ID.
+                */
                
-               return inst;
+               // Step1: Performed at the time of creating Lops
+               
+               // Step2: sort nodes by level, and then by node ID
+               Lop[] nodearray = v.toArray(new Lop[0]);
+               Arrays.sort(nodearray, new LopComparator());
 
+               return createIDMapping(nodearray);
        }
+       
+       private List<Lop> doTopologicalSortTwoLevelOrder(List<Lop> v) {
+               //partition nodes into leaf/inner nodes and dag root nodes,
+               //+ sort leaf/inner nodes by ID to force depth-first scheduling
+               //+ append root nodes in order of their original definition 
+               //  (which also preserves the original order of prints)
+               List<Lop> nodes = Stream.concat(
+                       v.stream().filter(l -> 
!l.getOutputs().isEmpty()).sorted(Comparator.comparing(l -> l.getID())),
+                       v.stream().filter(l -> 
l.getOutputs().isEmpty())).collect(Collectors.toList());
 
-       private static void deleteUpdatedTransientReadVariables(StatementBlock 
sb, ArrayList<Lop> nodeV, ArrayList<Instruction> inst) {
-               if ( sb == null ) 
-                       return;
-               
-               if( LOG.isTraceEnabled() )
-                       LOG.trace("In delete updated variables");
-
-               // CANDIDATE list of variables which could have been updated in 
this statement block 
-               HashMap<String, Lop> labelNodeMapping = new HashMap<>();
-               
-               // ACTUAL list of variables whose value is updated, AND the old 
value of the variable 
-               // is no longer accessible/used.
-               HashSet<String> updatedLabels = new HashSet<>();
-               HashMap<String, Lop> updatedLabelsLineNum =  new HashMap<>();
+               //NOTE: in contrast to hadoop execution modes, we avoid 
computing the transitive
+               //closure here to ensure linear time complexity because its 
unnecessary for CP and Spark
+               return nodes;
+       }
+       
+       private List<Lop> createIDMapping(Lop[] nodearray) {
+               // Copy sorted nodes into "v" and construct a mapping between 
Lop IDs and sequence of numbers
+               ArrayList<Lop> ret = new ArrayList<>();
+               IDMap.clear();
+               for (int i = 0; i < nodearray.length; i++) {
+                       ret.add(nodearray[i]);
+                       IDMap.put(nodearray[i].getID(), i);
+               }
                
-               // first capture all transient read variables
-               for ( Lop node : nodeV ) {
+               // Compute of All-pair reachability graph (Transitive Closure) 
of the DAG.
+               // - Perform a depth-first search (DFS) from every node $u$ in 
the DAG 
+               // - and construct the list of reachable nodes from the node $u$
+               // - store the constructed reachability information in 
$u$.reachable[] boolean array
+               for (int i = 0; i < nodearray.length; i++) {
+                       dagDFS(nodearray[i], nodearray[i]
+                               .create_reachable(nodearray.length));
+               }
 
-                       if (node.getExecLocation() == ExecLocation.Data
-                                       && ((Data) node).isTransient()
-                                       && ((Data) node).getOperationType() == 
OperationTypes.READ
-                                       && ((Data) node).getDataType() == 
DataType.MATRIX) {
-                               
-                               // "node" is considered as updated ONLY IF the 
old value is not used any more
-                               // So, make sure that this READ node does not 
feed into any (transient/persistent) WRITE
-                               boolean hasWriteParent=false;
-                               for(Lop p : node.getOutputs()) {
-                                       if(p.getExecLocation() == 
ExecLocation.Data) {
-                                               // if the "p" is of type Data, 
then it has to be a WRITE
-                                               hasWriteParent = true;
-                                               break;
-                                       }
-                               }
-                               
-                               if ( !hasWriteParent ) {
-                                       // node has no parent of type WRITE, so 
this is a CANDIDATE variable 
-                                       // add it to labelNodeMapping so that 
it is considered in further processing  
-                                       
labelNodeMapping.put(node.getOutputParameters().getLabel(), node);
+               // print the nodes in sorted order
+               if (LOG.isTraceEnabled()) {
+                       for ( Lop vnode : ret ) {
+                               StringBuilder sb = new StringBuilder();
+                               sb.append(vnode.getID());
+                               sb.append("(");
+                               sb.append(vnode.getLevel());
+                               sb.append(") ");
+                               sb.append(vnode.getType());
+                               sb.append("(");
+                               for(Lop vin : vnode.getInputs()) {
+                                       sb.append(vin.getID());
+                                       sb.append(",");
                                }
+                               sb.append("), ");
+                               LOG.trace(sb.toString());
                        }
+                       LOG.trace("topological sort -- done");
                }
+               return ret;
+       }
+       
+       
+       /**
+        * Method to group a vector of sorted lops.
+        * 
+        * @param sb statement block
+        * @param node_v list of low-level operators
+        * @return list of instructions
+        */
+       private ArrayList<Instruction> doGreedyGrouping(StatementBlock sb, 
List<Lop> node_v)
+       {
+               if( LOG.isTraceEnabled() )
+                       LOG.trace("Grouping DAG ============");
 
-               // capture updated transient write variables
-               for ( Lop node : nodeV ) {
+               // nodes to be executed in current iteration
+               List<Lop> execNodes = new ArrayList<>();
+               // nodes that have already been processed
+               List<Lop> finishedNodes = new ArrayList<>();
+               // nodes that are queued for the following iteration
+               List<Lop> queuedNodes = new ArrayList<>();
 
-                       if (node.getExecLocation() == ExecLocation.Data
-                                       && ((Data) node).isTransient()
-                                       && ((Data) node).getOperationType() == 
OperationTypes.WRITE
-                                       && ((Data) node).getDataType() == 
DataType.MATRIX
-                                       && 
labelNodeMapping.containsKey(node.getOutputParameters().getLabel()) // check to 
make sure corresponding (i.e., with the same label/name) transient read is 
present
-                                       && 
!labelNodeMapping.containsValue(node.getInputs().get(0)) // check to avoid 
cases where transient read feeds into a transient write 
-                               ) {
-                               
updatedLabels.add(node.getOutputParameters().getLabel());
-                               
updatedLabelsLineNum.put(node.getOutputParameters().getLabel(), node);
-                               
-                       }
-               }
+               List<List<Lop>> jobNodes = 
createNodeVectors(JobType.getNumJobTypes());
                
-               // generate RM instructions
-               Instruction rm_inst = null;
-               for ( String label : updatedLabels ) 
-               {
-                       rm_inst = 
VariableCPInstruction.prepareRemoveInstruction(label);
-                       rm_inst.setLocation(updatedLabelsLineNum.get(label));
-                       
-                       if( LOG.isTraceEnabled() )
-                               LOG.trace(rm_inst.toString());
-                       inst.add(rm_inst);
-               }
-
-       }
-         
-       private static void generateRemoveInstructions(StatementBlock sb, 
ArrayList<Instruction> deleteInst) {
+               //prepare basic instruction sets
+               List<Instruction> deleteInst = new ArrayList<>();
+               List<Instruction> writeInst = 
deleteUpdatedTransientReadVariables(sb, node_v);
+               List<Instruction> endOfBlockInst = 
generateRemoveInstructions(sb);
+               ArrayList<Instruction> inst = 
generateInstructionsForInputVariables(node_v);
                
-               if ( sb == null ) 
-                       return;
+               boolean done = false;
+               String indent = "    ";
 
-               if( LOG.isTraceEnabled() )
-                       LOG.trace("In generateRemoveInstructions()");
-               
-               Instruction inst = null;
-               // RULE 1: if in IN and not in OUT, then there should be an 
rmvar or rmfilevar inst
-               // (currently required for specific cases of external functions)
-               for (String varName : sb.liveIn().getVariableNames()) {
-                       if (!sb.liveOut().containsVariable(varName)) {
-                               inst = 
VariableCPInstruction.prepareRemoveInstruction(varName);
-                               inst.setLocation(sb.getFilename(), 
sb.getEndLine(), sb.getEndLine(), -1, -1);
-                               
-                               deleteInst.add(inst);
+               while (!done) {
+                       if( LOG.isTraceEnabled() )
+                               LOG.trace("Grouping nodes in DAG");
 
-                               if( LOG.isTraceEnabled() )
-                                       LOG.trace("  Adding " + 
inst.toString());
-                       }
-               }
-       }
+                       execNodes.clear();
+                       queuedNodes.clear();
+                       clearNodeVectors(jobNodes);
+                       gmrMapperFootprint=0;
 
-       private static ArrayList<ArrayList<Lop>> createNodeVectors(int size) {
-               ArrayList<ArrayList<Lop>> arr = new ArrayList<>();
+                       for ( Lop  node : node_v ) {
+                               // finished nodes don't need to be processed
+                               if (finishedNodes.contains(node))
+                                       continue;
 
-               // for each job type, we need to create a vector.
-               // additionally, create another vector for execNodes
-               for (int i = 0; i < size; i++) {
-                       arr.add(new ArrayList<Lop>());
-               }
-               return arr;
-       }
+                               if( LOG.isTraceEnabled() )
+                                       LOG.trace("Processing node (" + 
node.getID()
+                                                       + ") " + 
node.toString() + " exec nodes size is " + execNodes.size());
+                               
+                               
+                       //if node defines MR job, make sure it is compatible 
with all 
+                       //its children nodes in execNodes 
+                       if(node.definesMRJob() && 
!compatibleWithChildrenInExecNodes(execNodes, node))
+                       {
+                               if( LOG.isTraceEnabled() )                      
+                                 LOG.trace(indent + "Queueing node "
+                                       + node.toString() + " (code 1)");
+                       
+                         queuedNodes.add(node);
+                         removeNodesForNextIteration(node, finishedNodes, 
execNodes, queuedNodes, jobNodes);
+                         continue;
+                       }
 
-       private static void clearNodeVectors(ArrayList<ArrayList<Lop>> arr) {
-               for (ArrayList<Lop> tmp : arr) {
-                       tmp.clear();
-               }
-       }
+                               // if child is queued, this node will be 
processed in the later
+                               // iteration
+                               if (hasChildNode(node,queuedNodes)) {
 
-       private static boolean isCompatible(ArrayList<Lop> nodes, JobType jt, 
int from, int to) {
-               int base = jt.getBase();
-               for ( Lop node : nodes ) {
-                       if ((node.getCompatibleJobs() & base) == 0) {
-                               if( LOG.isTraceEnabled() )
-                                       LOG.trace("Not compatible "+ 
node.toString());
-                               return false;
-                       }
-               }
-               return true;
-       }
+                                       if( LOG.isTraceEnabled() )
+                                               LOG.trace(indent + "Queueing 
node "
+                                                               + 
node.toString() + " (code 2)");
+                                       queuedNodes.add(node);
 
-       /**
-        * Function that determines if the two input nodes can be executed 
together 
-        * in at least one job.
-        * 
-        * @param node1 low-level operator 1
-        * @param node2 low-level operator 2
-        * @return true if nodes can be executed together
-        */
-       private static boolean isCompatible(Lop node1, Lop node2) {
-               return( (node1.getCompatibleJobs() & node2.getCompatibleJobs()) 
> 0);
-       }
-       
-       /**
-        * Function that checks if the given node executes in the job specified 
by jt.
-        * 
-        * @param node low-level operator
-        * @param jt job type
-        * @return true if node executes in the specified job type
-        */
-       private static boolean isCompatible(Lop node, JobType jt) {
-               if ( jt == JobType.GMRCELL )
-                       jt = JobType.GMR;
-               return ((node.getCompatibleJobs() & jt.getBase()) > 0);
-       }
+                                       // if node has more than two inputs,
+                                       // remove children that will be needed 
in a future
+                                       // iterations
+                                       // may also have to remove parent nodes 
of these children
+                                       removeNodesForNextIteration(node, 
finishedNodes, execNodes,
+                                                       queuedNodes, jobNodes);
 
-       /*
-        * Add node, and its relevant children to job-specific node vectors.
-        */
-       private void addNodeByJobType(Lop node, ArrayList<ArrayList<Lop>> arr,
-                       ArrayList<Lop> execNodes, boolean eliminate) {
-               
-               if (!eliminate) {
-                       // Check if this lop defines a MR job.
-                       if ( node.definesMRJob() ) {
+                                       continue;
+                               }
                                
-                               // find the corresponding JobType
-                               JobType jt = JobType.findJobTypeFromLop(node);
-                               
-                               if ( jt == null ) {
-                                       throw new 
LopsException(node.printErrorLocation() + "No matching JobType is found for a 
the lop type: " + node.getType() + " \n");
-                               }
-                               
-                               // Add "node" to corresponding job vector
-                               
-                               if ( jt == JobType.GMR ) {
-                                       if ( node.hasNonBlockedInputs() ) {
-                                               int gmrcell_index = 
JobType.GMRCELL.getId();
-                                               
arr.get(gmrcell_index).add(node);
-                                               int from = 
arr.get(gmrcell_index).size();
-                                               addChildren(node, 
arr.get(gmrcell_index), execNodes);
-                                               int to = 
arr.get(gmrcell_index).size();
-                                               if 
(!isCompatible(arr.get(gmrcell_index),JobType.GMR, from, to))  // check against 
GMR only, not against GMRCELL
-                                                       throw new 
LopsException(node.printErrorLocation() + "Error during compatibility check 
\n");
-                                       }
-                                       else {
-                                               // if "node" (in this case, a 
group lop) has any inputs from RAND 
-                                               // then add it to RAND job. 
Otherwise, create a GMR job
-                                               if (hasChildNode(node, 
arr.get(JobType.DATAGEN.getId()) )) {
-                                                       
arr.get(JobType.DATAGEN.getId()).add(node);
-                                                       // we should NOT call 
'addChildren' because appropriate
-                                                       // child nodes would 
have got added to RAND job already
-                                               } else {
-                                                       int gmr_index = 
JobType.GMR.getId();
-                                                       
arr.get(gmr_index).add(node);
-                                                       int from = 
arr.get(gmr_index).size();
-                                                       addChildren(node, 
arr.get(gmr_index), execNodes);
-                                                       int to = 
arr.get(gmr_index).size();
-                                                       if 
(!isCompatible(arr.get(gmr_index),JobType.GMR, from, to)) 
-                                                               throw new 
LopsException(node.printErrorLocation() + "Error during compatibility check 
\n");
+                               // if inputs come from different jobs, then 
queue
+                               if ( node.getInputs().size() >= 2) {
+                                       int jobid = Integer.MIN_VALUE;
+                                       boolean queueit = false;
+                                       for(int idx=0; idx < 
node.getInputs().size(); idx++) {
+                                               int input_jobid = 
jobType(node.getInputs().get(idx), jobNodes);
+                                               if (input_jobid != -1) {
+                                                       if ( jobid == 
Integer.MIN_VALUE )
+                                                               jobid = 
input_jobid;
+                                                       else if ( jobid != 
input_jobid ) { 
+                                                               queueit = true;
+                                                               break;
+                                                       }
                                                }
                                        }
-                               }
-                               else {
-                                       int index = jt.getId();
-                                       arr.get(index).add(node);
-                                       int from = arr.get(index).size();
-                                       addChildren(node, arr.get(index), 
execNodes);
-                                       int to = arr.get(index).size();
-                                       // check if all added nodes are 
compatible with current job
-                                       if (!isCompatible(arr.get(index), jt, 
from, to)) {
-                                               throw new LopsException( 
-                                                               "Unexpected 
error in addNodeByType.");
+                                       if ( queueit ) {
+                                               if( LOG.isTraceEnabled() )
+                                                       LOG.trace(indent + 
"Queueing node " + node.toString() + " (code 3)");
+                                               queuedNodes.add(node);
+                                               
removeNodesForNextIteration(node, finishedNodes, execNodes, queuedNodes, 
jobNodes);
+                                               continue;
                                        }
                                }
-                               return;
-                       }
-               }
-
-               if ( eliminate ) {
-                       // Eliminated lops are directly added to GMR queue. 
-                       // Note that eliminate flag is set only for 'group' lops
-                       if ( node.hasNonBlockedInputs() )
-                               arr.get(JobType.GMRCELL.getId()).add(node);
-                       else
-                               arr.get(JobType.GMR.getId()).add(node);
-                       return;
-               }
-               
-               /*
-                * If this lop does not define a job, check if it uses the 
output of any
-                * specialized job. i.e., if this lop has a child node in any 
of the
-                * job-specific vector, then add it to the vector. Note: This 
lop must
-                * be added to ONLY ONE of the job-specific vectors.
-                */
 
-               int numAdded = 0;
-               for ( JobType j : JobType.values() ) {
-                       if ( j.getId() > 0 && hasDirectChildNode(node, 
arr.get(j.getId()))) {
-                               if (isCompatible(node, j)) {
-                                       arr.get(j.getId()).add(node);
-                                       numAdded += 1;
+                               // See if this lop can be eliminated
+                               // This check is for "aligner" lops (e.g., 
group)
+                               boolean eliminate = false;
+                               eliminate = canEliminateLop(node, execNodes);
+                               if (eliminate) {
+                                       if( LOG.isTraceEnabled() )
+                                               LOG.trace(indent + "Adding -"+ 
node.toString());
+                                       execNodes.add(node);
+                                       finishedNodes.add(node);
+                                       addNodeByJobType(node, jobNodes, 
execNodes, eliminate);
+                                       continue;
                                }
-                       }
-               }
-               if (numAdded > 1) {
-                       throw new LopsException("Unexpected error in 
addNodeByJobType(): A given lop can ONLY be added to a single job vector 
(numAdded = " + numAdded + ")." );
-               }
-       }
 
-       /*
-        * Remove the node from all job-specific node vectors. This method is
-        * invoked from removeNodesForNextIteration().
-        */
-       private static void removeNodeByJobType(Lop node, 
ArrayList<ArrayList<Lop>> arr) {
-               for ( JobType jt : JobType.values())
-                       if ( jt.getId() > 0 ) 
-                               arr.get(jt.getId()).remove(node);
-       }
+                               // If the node defines a MR Job then make sure 
none of its
+                               // children that defines a MR Job are present 
in execNodes
+                               if (node.definesMRJob()) {
+                                       if (hasMRJobChildNode(node, execNodes)) 
{
+                                               // "node" must NOT be queued 
when node=group and the child that defines job is Rand
+                                               // this is because "group" can 
be pushed into the "Rand" job.
+                                               if (! (node.getType() == 
Lop.Type.Grouping && checkDataGenAsChildNode(node,execNodes))  ) {
+                                                       if( 
LOG.isTraceEnabled() )
+                                                               
LOG.trace(indent + "Queueing node " + node.toString() + " (code 4)");
 
-       /**
-        * As some jobs only write one output, all operations in the mapper 
need to
-        * be redone and cannot be marked as finished.
-        * 
-        * @param execNodes list of exec low-level operators
-        * @param jobNodes list of job low-level operators
-        * @param finishedNodes list of finished low-level operators
-        */
-       private void handleSingleOutputJobs(ArrayList<Lop> execNodes,
-                       ArrayList<ArrayList<Lop>> jobNodes, ArrayList<Lop> 
finishedNodes)
-       {
-               /*
-                * If the input of a MMCJ/MMRJ job (must have executed in a 
Mapper) is used
-                * by multiple lops then we should mark it as not-finished.
-                */
-               ArrayList<Lop> nodesWithUnfinishedOutputs = new ArrayList<>();
-               int[] jobIndices = {JobType.MMCJ.getId()};
-               Lop.Type[] lopTypes = { Lop.Type.MMCJ};
-               
-               // TODO: SortByValue should be treated similar to MMCJ, since 
it can
-               // only sort one file now
-               
-               for ( int jobi=0; jobi < jobIndices.length; jobi++ ) {
-                       int jindex = jobIndices[jobi];
-                       if (!jobNodes.get(jindex).isEmpty()) {
-                               ArrayList<Lop> vec = jobNodes.get(jindex);
+                                                       queuedNodes.add(node);
 
-                               // first find all nodes with more than one 
parent that is not finished.
-                               for (int i = 0; i < vec.size(); i++) {
-                                       Lop node = vec.get(i);
-                                       if (node.getExecLocation() == 
ExecLocation.MapOrReduce
-                                                       || 
node.getExecLocation() == ExecLocation.Map) {
-                                               Lop MRparent = 
getParentNode(node, execNodes, ExecLocation.MapAndReduce);
-                                               if ( MRparent != null && 
MRparent.getType() == lopTypes[jobi]) {
-                                                       int numParents = 
node.getOutputs().size();
-                                                       if (numParents > 1) {
-                                                               for (int j = 0; 
j < numParents; j++) {
-                                                                       if 
(!finishedNodes.contains(node.getOutputs()
-                                                                               
        .get(j)))
-                                                                               
nodesWithUnfinishedOutputs.add(node);
-                                                               }
-       
-                                                       }
+                                                       
removeNodesForNextIteration(node, finishedNodes,
+                                                                       
execNodes, queuedNodes, jobNodes);
+
+                                                       continue;
                                                }
-                                       } 
+                                       }
                                }
 
-                               // need to redo all nodes in nodesWithOutput as 
well as their children
-                               for ( Lop node : vec ) {
-                                       if (node.getExecLocation() == 
ExecLocation.MapOrReduce
-                                                       || 
node.getExecLocation() == ExecLocation.Map) {
-                                               if 
(nodesWithUnfinishedOutputs.contains(node))
-                                                       
finishedNodes.remove(node);
+                               // if "node" has more than one input, and has a 
descendant lop
+                               // in execNodes that is of type RecordReader
+                               // then all its inputs must be ancestors of 
RecordReader. If
+                               // not, queue "node"
+                               if (node.getInputs().size() > 1
+                                               && hasChildNode(node, 
execNodes, ExecLocation.RecordReader)) {
+                                       // get the actual RecordReader lop
+                                       Lop rr_node = getChildNode(node, 
execNodes, ExecLocation.RecordReader);
 
-                                               if (hasParentNode(node, 
nodesWithUnfinishedOutputs))
-                                                       
finishedNodes.remove(node);
+                                       // all inputs of "node" must be 
ancestors of rr_node
+                                       boolean queue_it = false;
+                                       for (Lop n : node.getInputs()) {
+                                               // each input should be 
ancestor of RecordReader lop
+                                               if (!n.equals(rr_node) && 
!isChild(rr_node, n, IDMap)) {
+                                                       queue_it = true; // 
i.e., "node" must be queued
+                                                       break;
+                                               }
+                                       }
 
+                                       if (queue_it) {
+                                               // queue node
+                                               if( LOG.isTraceEnabled() )
+                                                       LOG.trace(indent + 
"Queueing -" + node.toString() + " (code 5)");
+                                               queuedNodes.add(node);
+                                               // TODO: does this have to be 
modified to handle
+                                               // recordreader lops?
+                                               
removeNodesForNextIteration(node, finishedNodes,
+                                                               execNodes, 
queuedNodes, jobNodes);
+                                               continue;
+                                       } else {
+                                               // nothing here.. subsequent 
checks have to be performed
+                                               // on "node"
                                        }
                                }
-                       }                       
-               }
-               
-       }
-
-       /**
-        * Method to check if a lop can be eliminated from checking
-        * 
-        * @param node low-level operator
-        * @param execNodes list of exec nodes
-        * @return true if lop can be eliminated
-        */
-       private static boolean canEliminateLop(Lop node, ArrayList<Lop> 
execNodes) {
-               // this function can only eliminate "aligner" lops such a group
-               if (!node.isAligner())
-                       return false;
 
-               // find the child whose execLoc = 'MapAndReduce'
-               int ret = getChildAlignment(node, execNodes, 
ExecLocation.MapAndReduce);
-
-               if (ret == CHILD_BREAKS_ALIGNMENT)
-                       return false;
-               else if (ret == CHILD_DOES_NOT_BREAK_ALIGNMENT)
-                       return true;
-               else if (ret == MRCHILD_NOT_FOUND)
-                       return false;
-               else if (ret == MR_CHILD_FOUND_BREAKS_ALIGNMENT)
-                       return false;
-               else if (ret == MR_CHILD_FOUND_DOES_NOT_BREAK_ALIGNMENT)
-                       return true;
-               else
-                       throw new RuntimeException("Should not happen. \n");
-       }
-       
-       
-       /**
-        * Method to generate createvar instructions, which creates a new entry
-        * in the symbol table. One instruction is generated for every LOP that 
is 
-        * 1) type Data and 
-        * 2) persistent and 
-        * 3) matrix and 
-        * 4) read
-        * 
-        * Transient reads needn't be considered here since the previous 
program 
-        * block would already create appropriate entries in the symbol table.
-        * 
-        * @param nodes_v list of nodes
-        * @param inst list of instructions
-        */
-       private static void 
generateInstructionsForInputVariables(ArrayList<Lop> nodes_v, 
ArrayList<Instruction> inst) {
-               for(Lop n : nodes_v) {
-                       if (n.getExecLocation() == ExecLocation.Data && 
!((Data) n).isTransient() 
-                                       && ((Data) n).getOperationType() == 
OperationTypes.READ 
-                                       && (n.getDataType() == DataType.MATRIX 
|| n.getDataType() == DataType.FRAME) ) {
-                               
-                               if ( !((Data)n).isLiteral() ) {
-                                       try {
-                                               String inst_string = 
n.getInstructions();
-                                               CPInstruction currInstr = 
CPInstructionParser.parseSingleInstruction(inst_string);
-                                               currInstr.setLocation(n);
-                                               inst.add(currInstr);
-                                       } catch (DMLRuntimeException e) {
-                                               throw new 
LopsException(n.printErrorLocation() + "error generating instructions from 
input variables in Dag -- \n", e);
-                                       }
-                               }
-                       }
-               }
-       }
-       
-       
-       /**
-        * Determine whether to send <code>node</code> to MR or to process it 
in the control program.
-        * It is sent to MR in the following cases:
-        * 
-        * 1) if input lop gets processed in MR then <code>node</code> can be 
piggybacked
-        * 
-        * 2) if the exectype of write lop itself is marked MR i.e., memory 
estimate > memory budget.
-        * 
-        * @param node low-level operator
-        * @return true if lop should be sent to MR
-        */
-       private static boolean sendWriteLopToMR(Lop node) 
-       {
-               if ( DMLScript.rtplatform == RUNTIME_PLATFORM.SINGLE_NODE )
-                       return false;
-               Lop in = node.getInputs().get(0);
-               Format nodeFormat = node.getOutputParameters().getFormat();
-               
-               //send write lop to MR if (1) it is marked with exec type MR 
(based on its memory estimate), or
-               //(2) if the input lop is in MR and the write format allows to 
pack it into the same job (this does
-               //not apply to csv write because MR csvwrite is a separate MR 
job type)
-               return (node.getExecType() == ExecType.MR
-                       || (in.getExecType() == ExecType.MR && nodeFormat != 
Format.CSV));
-       }
-       
-       /**
-        * Computes the memory footprint required to execute <code>node</code> 
in the mapper.
-        * It is used only for those nodes that use inputs from distributed 
cache. The returned 
-        * value is utilized in limiting the number of instructions piggybacked 
onto a single GMR mapper.
-        * 
-        * @param node low-level operator
-        * @return memory footprint
-        */
-       private static double computeFootprintInMapper(Lop node) {
-               // Memory limits must be checked only for nodes that use 
distributed cache
-               if ( ! node.usesDistributedCache() )
-                       // default behavior
-                       return 0.0;
-               
-               OutputParameters in1dims = 
node.getInputs().get(0).getOutputParameters();
-               OutputParameters in2dims = 
node.getInputs().get(1).getOutputParameters();
-               
-               double footprint = 0;
-               if ( node instanceof MapMult ) {
-                       int dcInputIndex = node.distributedCacheInputIndex()[0];
-                       footprint = AggBinaryOp.getMapmmMemEstimate(
-                                       in1dims.getNumRows(), 
in1dims.getNumCols(), in1dims.getRowsInBlock(), in1dims.getColsInBlock(), 
in1dims.getNnz(),
-                                       in2dims.getNumRows(), 
in2dims.getNumCols(), in2dims.getRowsInBlock(), in2dims.getColsInBlock(), 
in2dims.getNnz(),
-                                       dcInputIndex, false);
-               }
-               else if ( node instanceof PMMJ ) {
-                       int dcInputIndex = node.distributedCacheInputIndex()[0];
-                       footprint = AggBinaryOp.getMapmmMemEstimate(
-                                       in1dims.getNumRows(), 1, 
in1dims.getRowsInBlock(), in1dims.getColsInBlock(), in1dims.getNnz(),
-                                       in2dims.getNumRows(), 
in2dims.getNumCols(), in2dims.getRowsInBlock(), in2dims.getColsInBlock(), 
in2dims.getNnz(), 
-                                       dcInputIndex, true);
-               }
-               else if ( node instanceof AppendM ) {
-                       footprint = BinaryOp.footprintInMapper(
-                                       in1dims.getNumRows(), 
in1dims.getNumCols(), 
-                                       in2dims.getNumRows(), 
in2dims.getNumCols(), 
-                                       in1dims.getRowsInBlock(), 
in1dims.getColsInBlock());
-               }
-               else if ( node instanceof BinaryM ) {
-                       footprint = BinaryOp.footprintInMapper(
-                                       in1dims.getNumRows(), 
in1dims.getNumCols(), 
-                                       in2dims.getNumRows(), 
in2dims.getNumCols(), 
-                                       in1dims.getRowsInBlock(), 
in1dims.getColsInBlock());
-               }
-               else {
-                       // default behavior
-                       return 0.0;
-               }
-               return footprint;
-       }
-       
-       /**
-        * Determines if <code>node</code> can be executed in current round of 
MR jobs or if it needs to be queued for later rounds.
-        * If the total estimated footprint (<code>node</code> and previously 
added nodes in GMR) is less than available memory on 
-        * the mappers then <code>node</code> can be executed in current round, 
and <code>true</code> is returned. Otherwise, 
-        * <code>node</code> must be queued and <code>false</code> is returned. 
-        * 
-        * @param node low-level operator
-        * @param footprintInMapper mapper footprint
-        * @return true if node can be executed in current round of jobs
-        */
-       private static boolean checkMemoryLimits(Lop node, double 
footprintInMapper) {
-               boolean addNode = true;
-               
-               // Memory limits must be checked only for nodes that use 
distributed cache
-               if ( ! node.usesDistributedCache() )
-                       // default behavior
-                       return addNode;
-               
-               double memBudget = Math.min(AggBinaryOp.MAPMULT_MEM_MULTIPLIER, 
BinaryOp.APPEND_MEM_MULTIPLIER) * OptimizerUtils.getRemoteMemBudgetMap(true);
-               if ( footprintInMapper <= memBudget ) 
-                       return addNode;
-               else
-                       return !addNode;
-       }
-       
-       /**
-        * Method to group a vector of sorted lops.
-        * 
-        * @param sb statement block
-        * @param node_v list of low-level operators
-        * @return list of instructions
-        */
-       private ArrayList<Instruction> doGreedyGrouping(StatementBlock sb, 
ArrayList<Lop> node_v)
-       {
-               if( LOG.isTraceEnabled() )
-                       LOG.trace("Grouping DAG ============");
-
-               // nodes to be executed in current iteration
-               ArrayList<Lop> execNodes = new ArrayList<>();
-               // nodes that have already been processed
-               ArrayList<Lop> finishedNodes = new ArrayList<>();
-               // nodes that are queued for the following iteration
-               ArrayList<Lop> queuedNodes = new ArrayList<>();
-
-               ArrayList<ArrayList<Lop>> jobNodes = 
createNodeVectors(JobType.getNumJobTypes());
-               
-               // list of instructions
-               ArrayList<Instruction> inst = new ArrayList<>();
-
-               //ArrayList<Instruction> preWriteDeleteInst = new 
ArrayList<Instruction>();
-               ArrayList<Instruction> writeInst = new ArrayList<>();
-               ArrayList<Instruction> deleteInst = new ArrayList<>();
-               ArrayList<Instruction> endOfBlockInst = new ArrayList<>();
-
-               // remove files for transient reads that are updated.
-               deleteUpdatedTransientReadVariables(sb, node_v, writeInst);
-               
-               generateRemoveInstructions(sb, endOfBlockInst);
-
-               generateInstructionsForInputVariables(node_v, inst);
-               
-               
-               boolean done = false;
-               String indent = "    ";
-
-               while (!done) {
-                       if( LOG.isTraceEnabled() )
-                               LOG.trace("Grouping nodes in DAG");
-
-                       execNodes.clear();
-                       queuedNodes.clear();
-                       clearNodeVectors(jobNodes);
-                       gmrMapperFootprint=0;
-
-                       for ( Lop  node : node_v ) {
-                               // finished nodes don't need to be processed
-                               if (finishedNodes.contains(node))
-                                       continue;
-
-                               if( LOG.isTraceEnabled() )
-                                       LOG.trace("Processing node (" + 
node.getID()
-                                                       + ") " + 
node.toString() + " exec nodes size is " + execNodes.size());
-                               
-                               
-                       //if node defines MR job, make sure it is compatible 
with all 
-                       //its children nodes in execNodes 
-                       if(node.definesMRJob() && 
!compatibleWithChildrenInExecNodes(execNodes, node))
-                       {
-                               if( LOG.isTraceEnabled() )                      
-                                 LOG.trace(indent + "Queueing node "
-                                       + node.toString() + " (code 1)");
-                       
-                         queuedNodes.add(node);
-                         removeNodesForNextIteration(node, finishedNodes, 
execNodes, queuedNodes, jobNodes);
-                         continue;
-                       }
-
-                               // if child is queued, this node will be 
processed in the later
-                               // iteration
-                               if (hasChildNode(node,queuedNodes)) {
-
-                                       if( LOG.isTraceEnabled() )
-                                               LOG.trace(indent + "Queueing 
node "
-                                                               + 
node.toString() + " (code 2)");
-                                       queuedNodes.add(node);
-
-                                       // if node has more than two inputs,
-                                       // remove children that will be needed 
in a future
-                                       // iterations
-                                       // may also have to remove parent nodes 
of these children
-                                       removeNodesForNextIteration(node, 
finishedNodes, execNodes,
-                                                       queuedNodes, jobNodes);
-
-                                       continue;
-                               }
-                               
-                               // if inputs come from different jobs, then 
queue
-                               if ( node.getInputs().size() >= 2) {
-                                       int jobid = Integer.MIN_VALUE;
-                                       boolean queueit = false;
-                                       for(int idx=0; idx < 
node.getInputs().size(); idx++) {
-                                               int input_jobid = 
jobType(node.getInputs().get(idx), jobNodes);
-                                               if (input_jobid != -1) {
-                                                       if ( jobid == 
Integer.MIN_VALUE )
-                                                               jobid = 
input_jobid;
-                                                       else if ( jobid != 
input_jobid ) { 
-                                                               queueit = true;
-                                                               break;
-                                                       }
-                                               }
-                                       }
-                                       if ( queueit ) {
-                                               if( LOG.isTraceEnabled() )
-                                                       LOG.trace(indent + 
"Queueing node " + node.toString() + " (code 3)");
-                                               queuedNodes.add(node);
-                                               
removeNodesForNextIteration(node, finishedNodes, execNodes, queuedNodes, 
jobNodes);
-                                               continue;
-                                       }
-                               }
-
-                               // See if this lop can be eliminated
-                               // This check is for "aligner" lops (e.g., 
group)
-                               boolean eliminate = false;
-                               eliminate = canEliminateLop(node, execNodes);
-                               if (eliminate) {
-                                       if( LOG.isTraceEnabled() )
-                                               LOG.trace(indent + "Adding -"+ 
node.toString());
-                                       execNodes.add(node);
-                                       finishedNodes.add(node);
-                                       addNodeByJobType(node, jobNodes, 
execNodes, eliminate);
-                                       continue;
-                               }
-
-                               // If the node defines a MR Job then make sure 
none of its
-                               // children that defines a MR Job are present 
in execNodes
-                               if (node.definesMRJob()) {
-                                       if (hasMRJobChildNode(node, execNodes)) 
{
-                                               // "node" must NOT be queued 
when node=group and the child that defines job is Rand
-                                               // this is because "group" can 
be pushed into the "Rand" job.
-                                               if (! (node.getType() == 
Lop.Type.Grouping && checkDataGenAsChildNode(node,execNodes))  ) {
-                                                       if( 
LOG.isTraceEnabled() )
-                                                               
LOG.trace(indent + "Queueing node " + node.toString() + " (code 4)");
-
-                                                       queuedNodes.add(node);
-
-                                                       
removeNodesForNextIteration(node, finishedNodes,
-                                                                       
execNodes, queuedNodes, jobNodes);
-
-                                                       continue;
-                                               }
-                                       }
-                               }
-
-                               // if "node" has more than one input, and has a 
descendant lop
-                               // in execNodes that is of type RecordReader
-                               // then all its inputs must be ancestors of 
RecordReader. If
-                               // not, queue "node"
-                               if (node.getInputs().size() > 1
-                                               && hasChildNode(node, 
execNodes, ExecLocation.RecordReader)) {
-                                       // get the actual RecordReader lop
-                                       Lop rr_node = getChildNode(node, 
execNodes, ExecLocation.RecordReader);
-
-                                       // all inputs of "node" must be 
ancestors of rr_node
-                                       boolean queue_it = false;
-                                       for (Lop n : node.getInputs()) {
-                                               // each input should be 
ancestor of RecordReader lop
-                                               if (!n.equals(rr_node) && 
!isChild(rr_node, n, IDMap)) {
-                                                       queue_it = true; // 
i.e., "node" must be queued
-                                                       break;
-                                               }
-                                       }
-
-                                       if (queue_it) {
-                                               // queue node
-                                               if( LOG.isTraceEnabled() )
-                                                       LOG.trace(indent + 
"Queueing -" + node.toString() + " (code 5)");
-                                               queuedNodes.add(node);
-                                               // TODO: does this have to be 
modified to handle
-                                               // recordreader lops?
-                                               
removeNodesForNextIteration(node, finishedNodes,
-                                                               execNodes, 
queuedNodes, jobNodes);
-                                               continue;
-                                       } else {
-                                               // nothing here.. subsequent 
checks have to be performed
-                                               // on "node"
-                                       }
-                               }
-
-                               // data node, always add if child not queued
-                               // only write nodes are kept in execnodes
-                               if (node.getExecLocation() == 
ExecLocation.Data) {
-                                       Data dnode = (Data) node;
-                                       boolean dnode_queued = false;
-                                       
-                                       if ( dnode.getOperationType() == 
OperationTypes.READ ) {
-                                               if( LOG.isTraceEnabled() )
-                                                       LOG.trace(indent + 
"Adding Data -"+ node.toString());
+                               // data node, always add if child not queued
+                               // only write nodes are kept in execnodes
+                               if (node.getExecLocation() == 
ExecLocation.Data) {
+                                       Data dnode = (Data) node;
+                                       boolean dnode_queued = false;
+                                       
+                                       if ( dnode.getOperationType() == 
OperationTypes.READ ) {
+                                               if( LOG.isTraceEnabled() )
+                                                       LOG.trace(indent + 
"Adding Data -"+ node.toString());
 
                                                // TODO: avoid readScalar 
instruction, and read it on-demand just like the way Matrices are read in 
control program
                                                if ( node.getDataType() == 
DataType.SCALAR 
@@ -957,10 +542,7 @@ public class Dag<N extends Lop>
                                                // TODO: this case should 
ideally be handled in the language layer 
                                                //       prior to the 
construction of Hops Dag 
                                                Lop input = 
dnode.getInputs().get(0);
-                                               if ( dnode.isTransient() 
-                                                               && 
input.getExecLocation() == ExecLocation.Data 
-                                                               && 
((Data)input).isTransient() 
-                                                               && 
dnode.getOutputParameters().getLabel().equals(input.getOutputParameters().getLabel())
 ) {
+                                               if ( 
isTransientWriteRead(dnode) ) {
                                                        // do nothing, 
<code>node</code> must not processed any further.
                                                }
                                                else if ( 
execNodes.contains(input) && !isCompatible(node, input) && 
sendWriteLopToMR(node)) {
@@ -1078,122 +660,601 @@ public class Dag<N extends Lop>
                                        continue;
                                }
 
-                               // aligned reduce, make sure a parent that is 
reduce exists
-                               if (node.getExecLocation() == 
ExecLocation.Reduce) {
-                                       if (  
compatibleWithChildrenInExecNodes(execNodes, node) && 
-                                                       (hasChildNode(node, 
execNodes, ExecLocation.MapAndReduce)
-                                                        || hasChildNode(node, 
execNodes, ExecLocation.Map) ) ) 
-                                       { 
-                                               if( LOG.isTraceEnabled() )
-                                                       LOG.trace(indent + 
"Adding -"+ node.toString());
-                                               execNodes.add(node);
-                                               finishedNodes.add(node);
-                                               addNodeByJobType(node, 
jobNodes, execNodes, false);
-                                       } else {
-                                               if( LOG.isTraceEnabled() )
-                                                       LOG.trace(indent + 
"Queueing -"+ node.toString() + " (code 8)");
-                                               queuedNodes.add(node);
-                                               
removeNodesForNextIteration(node, finishedNodes,
-                                                               execNodes, 
queuedNodes, jobNodes);
-                                       }
+                               // aligned reduce, make sure a parent that is 
reduce exists
+                               if (node.getExecLocation() == 
ExecLocation.Reduce) {
+                                       if (  
compatibleWithChildrenInExecNodes(execNodes, node) && 
+                                                       (hasChildNode(node, 
execNodes, ExecLocation.MapAndReduce)
+                                                        || hasChildNode(node, 
execNodes, ExecLocation.Map) ) ) 
+                                       { 
+                                               if( LOG.isTraceEnabled() )
+                                                       LOG.trace(indent + 
"Adding -"+ node.toString());
+                                               execNodes.add(node);
+                                               finishedNodes.add(node);
+                                               addNodeByJobType(node, 
jobNodes, execNodes, false);
+                                       } else {
+                                               if( LOG.isTraceEnabled() )
+                                                       LOG.trace(indent + 
"Queueing -"+ node.toString() + " (code 8)");
+                                               queuedNodes.add(node);
+                                               
removeNodesForNextIteration(node, finishedNodes,
+                                                               execNodes, 
queuedNodes, jobNodes);
+                                       }
+
+                                       continue;
+
+                               }
+
+                               // add Scalar to execNodes if it has no child 
in exec nodes
+                               // that will be executed in a MR job.
+                               if (node.getExecLocation() == 
ExecLocation.ControlProgram) {
+                                       for ( Lop lop : node.getInputs() ) {
+                                               if (execNodes.contains(lop)
+                                                               && 
!(lop.getExecLocation() == ExecLocation.Data)
+                                                               && 
!(lop.getExecLocation() == ExecLocation.ControlProgram)) {
+                                                       if( 
LOG.isTraceEnabled() )
+                                                               
LOG.trace(indent + "Queueing -"+ node.toString() + " (code 9)");
+
+                                                       queuedNodes.add(node);
+                                                       
removeNodesForNextIteration(node, finishedNodes,
+                                                                       
execNodes, queuedNodes, jobNodes);
+                                                       break;
+                                               }
+                                       }
+
+                                       if (queuedNodes.contains(node))
+                                               continue;
+                                       if( LOG.isTraceEnabled() )
+                                               LOG.trace(indent + "Adding - 
scalar"+ node.toString());
+                                       execNodes.add(node);
+                                       addNodeByJobType(node, jobNodes, 
execNodes, false);
+                                       finishedNodes.add(node);
+                                       continue;
+                               }
+
+                       }
+
+                       // no work to do
+                       if ( execNodes.isEmpty() ) {
+                         
+                         if( !queuedNodes.isEmpty() )
+                                 throw new LopsException("Queued nodes should 
not be 0 at this point \n");
+                         
+                         if( LOG.isTraceEnabled() )
+                               LOG.trace("All done! queuedNodes = "+ 
queuedNodes.size());
+                               
+                         done = true;
+                       } else {
+                               // work to do
+
+                               if( LOG.isTraceEnabled() )
+                                       LOG.trace("Generating jobs for group -- 
Node count="+ execNodes.size());
+
+                               // first process scalar instructions
+                               generateControlProgramJobs(execNodes, inst, 
writeInst, deleteInst);
+
+                               // copy unassigned lops in execnodes to gmrnodes
+                               for (int i = 0; i < execNodes.size(); i++) {
+                                       Lop node = execNodes.get(i);
+                                       if (jobType(node, jobNodes) == -1) {
+                                               if ( isCompatible(node,  
JobType.GMR) ) {
+                                                       if ( 
node.hasNonBlockedInputs() ) {
+                                                               
jobNodes.get(JobType.GMRCELL.getId()).add(node);
+                                                               
addChildren(node, jobNodes.get(JobType.GMRCELL.getId()), execNodes);
+                                                       }
+                                                       else {
+                                                               
jobNodes.get(JobType.GMR.getId()).add(node);
+                                                               
addChildren(node, jobNodes.get(JobType.GMR.getId()), execNodes);
+                                                       }
+                                               }
+                                               else {
+                                                       if( 
LOG.isTraceEnabled() )
+                                                               
LOG.trace(indent + "Queueing -" + node.toString() + " (code 10)");
+                                                       execNodes.remove(i);
+                                                       
finishedNodes.remove(node);
+                                                       queuedNodes.add(node);
+                                                       
removeNodesForNextIteration(node, finishedNodes,
+                                                               execNodes, 
queuedNodes, jobNodes);
+                                               }
+                                       }
+                               }
+
+                               // next generate MR instructions
+                               if (!execNodes.isEmpty())
+                                       generateMRJobs(execNodes, inst, 
writeInst, deleteInst, jobNodes);
+                               handleSingleOutputJobs(execNodes, jobNodes, 
finishedNodes);
+                       }
+               }
+
+               // add write and delete inst at the very end.
+               inst.addAll(writeInst);
+               inst.addAll(deleteInst);
+               inst.addAll(endOfBlockInst);
+               return inst;
+       }
+
+       private ArrayList<Instruction> doPlainInstructionGen(StatementBlock sb, 
List<Lop> nodes)
+       {
+               //prepare basic instruction sets
+               List<Instruction> deleteInst = new ArrayList<>();
+               List<Instruction> writeInst = 
deleteUpdatedTransientReadVariables(sb, nodes);
+               List<Instruction> endOfBlockInst = 
generateRemoveInstructions(sb);
+               ArrayList<Instruction> inst = 
generateInstructionsForInputVariables(nodes);
+               
+               // filter out non-executable nodes
+               List<Lop> execNodes = nodes.stream()
+                       .filter(l -> (l.getExecLocation() != ExecLocation.Data 
+                               || 
(((Data)l).getOperationType()==OperationTypes.WRITE && 
!isTransientWriteRead((Data)l))
+                               || (((Data)l).isPersistentRead() && 
l.getDataType().isScalar())))
+                       .collect(Collectors.toList());
+               
+               // generate executable instruction
+               generateControlProgramJobs(execNodes, inst, writeInst, 
deleteInst);
+
+               // add write and delete inst at the very end.
+               inst.addAll(writeInst);
+               inst.addAll(deleteInst);
+               inst.addAll(endOfBlockInst);
+               return inst;
+       }
+       
+       private boolean isTransientWriteRead(Data dnode) {
+               Lop input = dnode.getInputs().get(0);
+               return dnode.isTransient() 
+                       && input.getExecLocation() == ExecLocation.Data  && 
((Data)input).isTransient() 
+                       && 
dnode.getOutputParameters().getLabel().equals(input.getOutputParameters().getLabel());
+       }
+       
+       private static List<Instruction> 
deleteUpdatedTransientReadVariables(StatementBlock sb, List<Lop> nodeV) {
+               List<Instruction> insts = new ArrayList<>();
+               if ( sb == null ) //return modifiable list
+                       return insts;
+               
+               if( LOG.isTraceEnabled() )
+                       LOG.trace("In delete updated variables");
+
+               // CANDIDATE list of variables which could have been updated in 
this statement block 
+               HashMap<String, Lop> labelNodeMapping = new HashMap<>();
+               
+               // ACTUAL list of variables whose value is updated, AND the old 
value of the variable 
+               // is no longer accessible/used.
+               HashSet<String> updatedLabels = new HashSet<>();
+               HashMap<String, Lop> updatedLabelsLineNum =  new HashMap<>();
+               
+               // first capture all transient read variables
+               for ( Lop node : nodeV ) {
+                       if (node.getExecLocation() == ExecLocation.Data
+                                       && ((Data) node).isTransient()
+                                       && ((Data) node).getOperationType() == 
OperationTypes.READ
+                                       && ((Data) node).getDataType() == 
DataType.MATRIX) {
+                               // "node" is considered as updated ONLY IF the 
old value is not used any more
+                               // So, make sure that this READ node does not 
feed into any (transient/persistent) WRITE
+                               boolean hasWriteParent=false;
+                               for(Lop p : node.getOutputs()) {
+                                       if(p.getExecLocation() == 
ExecLocation.Data) {
+                                               // if the "p" is of type Data, 
then it has to be a WRITE
+                                               hasWriteParent = true;
+                                               break;
+                                       }
+                               }
+                               if ( !hasWriteParent ) {
+                                       // node has no parent of type WRITE, so 
this is a CANDIDATE variable 
+                                       // add it to labelNodeMapping so that 
it is considered in further processing
+                                       
labelNodeMapping.put(node.getOutputParameters().getLabel(), node);
+                               }
+                       }
+               }
+
+               // capture updated transient write variables
+               for ( Lop node : nodeV ) {
+                       if (node.getExecLocation() == ExecLocation.Data
+                                       && ((Data) node).isTransient()
+                                       && ((Data) node).getOperationType() == 
OperationTypes.WRITE
+                                       && ((Data) node).getDataType() == 
DataType.MATRIX
+                                       && 
labelNodeMapping.containsKey(node.getOutputParameters().getLabel()) // check to 
make sure corresponding (i.e., with the same label/name) transient read is 
present
+                                       && 
!labelNodeMapping.containsValue(node.getInputs().get(0)) ){ // check to avoid 
cases where transient read feeds into a transient write
+                               
updatedLabels.add(node.getOutputParameters().getLabel());
+                               
updatedLabelsLineNum.put(node.getOutputParameters().getLabel(), node);
+                       }
+               }
+               
+               // generate RM instructions
+               Instruction rm_inst = null;
+               for ( String label : updatedLabels ) {
+                       rm_inst = 
VariableCPInstruction.prepareRemoveInstruction(label);
+                       rm_inst.setLocation(updatedLabelsLineNum.get(label));
+                       if( LOG.isTraceEnabled() )
+                               LOG.trace(rm_inst.toString());
+                       insts.add(rm_inst);
+               }
+               return insts;
+       }
+       
+       private static List<Instruction> 
generateRemoveInstructions(StatementBlock sb) {
+               if ( sb == null ) 
+                       return Collections.emptyList();
+               ArrayList<Instruction> insts = new ArrayList<>();
+               
+               if( LOG.isTraceEnabled() )
+                       LOG.trace("In generateRemoveInstructions()");
+               
+               // RULE 1: if in IN and not in OUT, then there should be an 
rmvar or rmfilevar inst
+               // (currently required for specific cases of external functions)
+               for (String varName : sb.liveIn().getVariableNames()) {
+                       if (!sb.liveOut().containsVariable(varName)) {
+                               Instruction inst = 
VariableCPInstruction.prepareRemoveInstruction(varName);
+                               inst.setLocation(sb.getFilename(), 
sb.getEndLine(), sb.getEndLine(), -1, -1);
+                               insts.add(inst);
+                               if( LOG.isTraceEnabled() )
+                                       LOG.trace("  Adding " + 
inst.toString());
+                       }
+               }
+               return insts;
+       }
+
+       private static List<List<Lop>> createNodeVectors(int size) {
+               return IntStream.range(0, size).mapToObj(i -> 
+                       new ArrayList<Lop>()).collect(Collectors.toList());
+       }
 
-                                       continue;
+       private static void clearNodeVectors(List<List<Lop>> arr) {
+               arr.stream().forEach(t -> t.clear());
+       }
 
-                               }
+       private static boolean isCompatible(List<Lop> nodes, JobType jt, int 
from, int to) {
+               return nodes.stream().allMatch(n -> (n.getCompatibleJobs() & 
jt.getBase()) != 0);
+       }
 
-                               // add Scalar to execNodes if it has no child 
in exec nodes
-                               // that will be executed in a MR job.
-                               if (node.getExecLocation() == 
ExecLocation.ControlProgram) {
-                                       for ( Lop lop : node.getInputs() ) {
-                                               if (execNodes.contains(lop)
-                                                               && 
!(lop.getExecLocation() == ExecLocation.Data)
-                                                               && 
!(lop.getExecLocation() == ExecLocation.ControlProgram)) {
-                                                       if( 
LOG.isTraceEnabled() )
-                                                               
LOG.trace(indent + "Queueing -"+ node.toString() + " (code 9)");
+       /**
+        * Function that determines if the two input nodes can be executed 
together 
+        * in at least one job.
+        * 
+        * @param node1 low-level operator 1
+        * @param node2 low-level operator 2
+        * @return true if nodes can be executed together
+        */
+       private static boolean isCompatible(Lop node1, Lop node2) {
+               return( (node1.getCompatibleJobs() & node2.getCompatibleJobs()) 
> 0);
+       }
+       
+       /**
+        * Function that checks if the given node executes in the job specified 
by jt.
+        * 
+        * @param node low-level operator
+        * @param jt job type
+        * @return true if node executes in the specified job type
+        */
+       private static boolean isCompatible(Lop node, JobType jt) {
+               if ( jt == JobType.GMRCELL )
+                       jt = JobType.GMR;
+               return ((node.getCompatibleJobs() & jt.getBase()) > 0);
+       }
 
-                                                       queuedNodes.add(node);
-                                                       
removeNodesForNextIteration(node, finishedNodes,
-                                                                       
execNodes, queuedNodes, jobNodes);
-                                                       break;
+       /*
+        * Add node, and its relevant children to job-specific node vectors.
+        */
+       private void addNodeByJobType(Lop node, List<List<Lop>> arr, List<Lop> 
execNodes, boolean eliminate) {
+               if (!eliminate) {
+                       // Check if this lop defines a MR job.
+                       if ( node.definesMRJob() ) {
+                               
+                               // find the corresponding JobType
+                               JobType jt = JobType.findJobTypeFromLop(node);
+                               
+                               if ( jt == null ) {
+                                       throw new 
LopsException(node.printErrorLocation() + "No matching JobType is found for a 
the lop type: " + node.getType() + " \n");
+                               }
+                               
+                               // Add "node" to corresponding job vector
+                               
+                               if ( jt == JobType.GMR ) {
+                                       if ( node.hasNonBlockedInputs() ) {
+                                               int gmrcell_index = 
JobType.GMRCELL.getId();
+                                               
arr.get(gmrcell_index).add(node);
+                                               int from = 
arr.get(gmrcell_index).size();
+                                               addChildren(node, 
arr.get(gmrcell_index), execNodes);
+                                               int to = 
arr.get(gmrcell_index).size();
+                                               if 
(!isCompatible(arr.get(gmrcell_index),JobType.GMR, from, to))  // check against 
GMR only, not against GMRCELL
+                                                       throw new 
LopsException(node.printErrorLocation() + "Error during compatibility check 
\n");
+                                       }
+                                       else {
+                                               // if "node" (in this case, a 
group lop) has any inputs from RAND 
+                                               // then add it to RAND job. 
Otherwise, create a GMR job
+                                               if (hasChildNode(node, 
arr.get(JobType.DATAGEN.getId()) )) {
+                                                       
arr.get(JobType.DATAGEN.getId()).add(node);
+                                                       // we should NOT call 
'addChildren' because appropriate
+                                                       // child nodes would 
have got added to RAND job already
+                                               } else {
+                                                       int gmr_index = 
JobType.GMR.getId();
+                                                       
arr.get(gmr_index).add(node);
+                                                       int from = 
arr.get(gmr_index).size();
+                                                       addChildren(node, 
arr.get(gmr_index), execNodes);
+                                                       int to = 
arr.get(gmr_index).size();
+                                                       if 
(!isCompatible(arr.get(gmr_index),JobType.GMR, from, to)) 
+                                                               throw new 
LopsException(node.printErrorLocation() + "Error during compatibility check 
\n");
                                                }
                                        }
-
-                                       if (queuedNodes.contains(node))
-                                               continue;
-                                       if( LOG.isTraceEnabled() )
-                                               LOG.trace(indent + "Adding - 
scalar"+ node.toString());
-                                       execNodes.add(node);
-                                       addNodeByJobType(node, jobNodes, 
execNodes, false);
-                                       finishedNodes.add(node);
-                                       continue;
                                }
-
+                               else {
+                                       int index = jt.getId();
+                                       arr.get(index).add(node);
+                                       int from = arr.get(index).size();
+                                       addChildren(node, arr.get(index), 
execNodes);
+                                       int to = arr.get(index).size();
+                                       // check if all added nodes are 
compatible with current job
+                                       if (!isCompatible(arr.get(index), jt, 
from, to)) {
+                                               throw new LopsException( 
+                                                               "Unexpected 
error in addNodeByType.");
+                                       }
+                               }
+                               return;
                        }
+               }
 
-                       // no work to do
-                       if ( execNodes.isEmpty() ) {
-                         
-                         if( !queuedNodes.isEmpty() )
-                                 throw new LopsException("Queued nodes should 
not be 0 at this point \n");
-                         
-                         if( LOG.isTraceEnabled() )
-                               LOG.trace("All done! queuedNodes = "+ 
queuedNodes.size());
-                               
-                         done = true;
-                       } else {
-                               // work to do
+               if ( eliminate ) {
+                       // Eliminated lops are directly added to GMR queue. 
+                       // Note that eliminate flag is set only for 'group' lops
+                       if ( node.hasNonBlockedInputs() )
+                               arr.get(JobType.GMRCELL.getId()).add(node);
+                       else
+                               arr.get(JobType.GMR.getId()).add(node);
+                       return;
+               }
+               
+               /*
+                * If this lop does not define a job, check if it uses the 
output of any
+                * specialized job. i.e., if this lop has a child node in any 
of the
+                * job-specific vector, then add it to the vector. Note: This 
lop must
+                * be added to ONLY ONE of the job-specific vectors.
+                */
 
-                               if( LOG.isTraceEnabled() )
-                                       LOG.trace("Generating jobs for group -- 
Node count="+ execNodes.size());
+               int numAdded = 0;
+               for ( JobType j : JobType.values() ) {
+                       if ( j.getId() > 0 && hasDirectChildNode(node, 
arr.get(j.getId()))) {
+                               if (isCompatible(node, j)) {
+                                       arr.get(j.getId()).add(node);
+                                       numAdded += 1;
+                               }
+                       }
+               }
+               if (numAdded > 1) {
+                       throw new LopsException("Unexpected error in 
addNodeByJobType(): A given lop can ONLY be added to a single job vector 
(numAdded = " + numAdded + ")." );
+               }
+       }
 
-                               // first process scalar instructions
-                               generateControlProgramJobs(execNodes, inst, 
writeInst, deleteInst);
+       /*
+        * Remove the node from all job-specific node vectors. This method is
+        * invoked from removeNodesForNextIteration().
+        */
+       private static void removeNodeByJobType(Lop node, List<List<Lop>> arr) {
+               for ( JobType jt : JobType.values())
+                       if ( jt.getId() > 0 ) 
+                               arr.get(jt.getId()).remove(node);
+       }
 
-                               // copy unassigned lops in execnodes to gmrnodes
-                               for (int i = 0; i < execNodes.size(); i++) {
-                                       Lop node = execNodes.get(i);
-                                       if (jobType(node, jobNodes) == -1) {
-                                               if ( isCompatible(node,  
JobType.GMR) ) {
-                                                       if ( 
node.hasNonBlockedInputs() ) {
-                                                               
jobNodes.get(JobType.GMRCELL.getId()).add(node);
-                                                               
addChildren(node, jobNodes.get(JobType.GMRCELL.getId()), execNodes);
-                                                       }
-                                                       else {
-                                                               
jobNodes.get(JobType.GMR.getId()).add(node);
-                                                               
addChildren(node, jobNodes.get(JobType.GMR.getId()), execNodes);
+       /**
+        * As some jobs only write one output, all operations in the mapper 
need to
+        * be redone and cannot be marked as finished.
+        * 
+        * @param execNodes list of exec low-level operators
+        * @param jobNodes list of job low-level operators
+        * @param finishedNodes list of finished low-level operators
+        */
+       private void handleSingleOutputJobs(List<Lop> execNodes, 
List<List<Lop>> jobNodes, List<Lop> finishedNodes)
+       {
+               /*
+                * If the input of a MMCJ/MMRJ job (must have executed in a 
Mapper) is used
+                * by multiple lops then we should mark it as not-finished.
+                */
+               ArrayList<Lop> nodesWithUnfinishedOutputs = new ArrayList<>();
+               int[] jobIndices = {JobType.MMCJ.getId()};
+               Lop.Type[] lopTypes = { Lop.Type.MMCJ};
+               
+               // TODO: SortByValue should be treated similar to MMCJ, since 
it can
+               // only sort one file now
+               
+               for ( int jobi=0; jobi < jobIndices.length; jobi++ ) {
+                       int jindex = jobIndices[jobi];
+                       if (!jobNodes.get(jindex).isEmpty()) {
+                               List<Lop> vec = jobNodes.get(jindex);
+                               // first find all nodes with more than one 
parent that is not finished.
+                               for (int i = 0; i < vec.size(); i++) {
+                                       Lop node = vec.get(i);
+                                       if (node.getExecLocation() == 
ExecLocation.MapOrReduce
+                                                       || 
node.getExecLocation() == ExecLocation.Map) {
+                                               Lop MRparent = 
getParentNode(node, execNodes, ExecLocation.MapAndReduce);
+                                               if ( MRparent != null && 
MRparent.getType() == lopTypes[jobi]) {
+                                                       int numParents = 
node.getOutputs().size();
+                                                       if (numParents > 1) {
+                                                               for (int j = 0; 
j < numParents; j++) {
+                                                                       if 
(!finishedNodes.contains(node.getOutputs()
+                                                                               
        .get(j)))
+                                                                               
nodesWithUnfinishedOutputs.add(node);
+                                                               }
                                                        }
                                                }
-                                               else {
-                                                       if( 
LOG.isTraceEnabled() )
-                                                               
LOG.trace(indent + "Queueing -" + node.toString() + " (code 10)");
-                                                       execNodes.remove(i);
+                                       } 
+                               }
+
+                               // need to redo all nodes in nodesWithOutput as 
well as their children
+                               for ( Lop node : vec ) {
+                                       if (node.getExecLocation() == 
ExecLocation.MapOrReduce
+                                                       || 
node.getExecLocation() == ExecLocation.Map) {
+                                               if 
(nodesWithUnfinishedOutputs.contains(node))
                                                        
finishedNodes.remove(node);
-                                                       queuedNodes.add(node);
-                                                       
removeNodesForNextIteration(node, finishedNodes,
-                                                               execNodes, 
queuedNodes, jobNodes);
-                                               }
+                                               if (hasParentNode(node, 
nodesWithUnfinishedOutputs))
+                                                       
finishedNodes.remove(node);
+                                       }
+                               }
+                       }
+               }
+               
+       }
+
+       /**
+        * Method to check if a lop can be eliminated from checking
+        * 
+        * @param node low-level operator
+        * @param execNodes list of exec nodes
+        * @return true if lop can be eliminated
+        */
+       private static boolean canEliminateLop(Lop node, List<Lop> execNodes) {
+               // this function can only eliminate "aligner" lops such a group
+               if (!node.isAligner())
+                       return false;
+               // find the child whose execLoc = 'MapAndReduce'
+               int ret = getChildAlignment(node, execNodes, 
ExecLocation.MapAndReduce);
+               if (ret == CHILD_BREAKS_ALIGNMENT)
+                       return false;
+               else if (ret == CHILD_DOES_NOT_BREAK_ALIGNMENT)
+                       return true;
+               else if (ret == MRCHILD_NOT_FOUND)
+                       return false;
+               else if (ret == MR_CHILD_FOUND_BREAKS_ALIGNMENT)
+                       return false;
+               else if (ret == MR_CHILD_FOUND_DOES_NOT_BREAK_ALIGNMENT)
+                       return true;
+               else
+                       throw new RuntimeException("Should not happen. \n");
+       }
+       
+       
+       /**
+        * Method to generate createvar instructions, which creates a new entry
+        * in the symbol table. One instruction is generated for every LOP that 
is 
+        * 1) type Data and 
+        * 2) persistent and 
+        * 3) matrix and 
+        * 4) read
+        * 
+        * Transient reads needn't be considered here since the previous 
program 
+        * block would already create appropriate entries in the symbol table.
+        * 
+        * @param nodes_v list of nodes
+        * @return list of instructions
+        */
+       private static ArrayList<Instruction> 
generateInstructionsForInputVariables(List<Lop> nodes_v) {
+               ArrayList<Instruction> insts = new ArrayList<>();
+               for(Lop n : nodes_v) {
+                       if (n.getExecLocation() == ExecLocation.Data && 
!((Data) n).isTransient() 
+                                       && ((Data) n).getOperationType() == 
OperationTypes.READ 
+                                       && (n.getDataType() == DataType.MATRIX 
|| n.getDataType() == DataType.FRAME) ) {
+                               if ( !((Data)n).isLiteral() ) {
+                                       try {
+                                               String inst_string = 
n.getInstructions();
+                                               CPInstruction currInstr = 
CPInstructionParser.parseSingleInstruction(inst_string);
+                                               currInstr.setLocation(n);
+                                               insts.add(currInstr);
+                                       } catch (DMLRuntimeException e) {
+                                               throw new 
LopsException(n.printErrorLocation() + "error generating instructions from 
input variables in Dag -- \n", e);
                                        }
                                }
-
-                               // next generate MR instructions
-                               if (!execNodes.isEmpty())
-                                       generateMRJobs(execNodes, inst, 
writeInst, deleteInst, jobNodes);
-                               handleSingleOutputJobs(execNodes, jobNodes, 
finishedNodes);
                        }
                }
-
-               // add write and delete inst at the very end.
-
-               //inst.addAll(preWriteDeleteInst);
-               inst.addAll(writeInst);
-               inst.addAll(deleteInst);
-               inst.addAll(endOfBlockInst);
-
-               return inst;
-
+               return insts;
+       }
+       
+       
+       /**
+        * Determine whether to send <code>node</code> to MR or to process it 
in the control program.
+        * It is sent to MR in the following cases:
+        * 
+        * 1) if input lop gets processed in MR then <code>node</code> can be 
piggybacked
+        * 
+        * 2) if the exectype of write lop itself is marked MR i.e., memory 
estimate > memory budget.
+        * 
+        * @param node low-level operator
+        * @return true if lop should be sent to MR
+        */
+       private static boolean sendWriteLopToMR(Lop node) 
+       {
+               if ( DMLScript.rtplatform == RUNTIME_PLATFORM.SINGLE_NODE )
+                       return false;
+               Lop in = node.getInputs().get(0);
+               Format nodeFormat = node.getOutputParameters().getFormat();
+               
+               //send write lop to MR if (1) it is marked with exec type MR 
(based on its memory estimate), or
+               //(2) if the input lop is in MR and the write format allows to 
pack it into the same job (this does
+               //not apply to csv write because MR csvwrite is a separate MR 
job type)
+               return (node.getExecType() == ExecType.MR
+                       || (in.getExecType() == ExecType.MR && nodeFormat != 
Format.CSV));
+       }
+       
+       /**
+        * Computes the memory footprint required to execute <code>node</code> 
in the mapper.
+        * It is used only for those nodes that use inputs from distributed 
cache. The returned 
+        * value is utilized in limiting the number of instructions piggybacked 
onto a single GMR mapper.
+        * 
+        * @param node low-level operator
+        * @return memory footprint
+        */
+       private static double computeFootprintInMapper(Lop node) {
+               // Memory limits must be checked only for nodes that use 
distributed cache
+               if ( ! node.usesDistributedCache() )
+                       // default behavior
+                       return 0.0;
+               
+               OutputParameters in1dims = 
node.getInputs().get(0).getOutputParameters();
+               OutputParameters in2dims = 
node.getInputs().get(1).getOutputParameters();
+               
+               double footprint = 0;
+               if ( node instanceof MapMult ) {
+                       int dcInputIndex = node.distributedCacheInputIndex()[0];
+                       footprint = AggBinaryOp.getMapmmMemEstimate(
+                                       in1dims.getNumRows(), 
in1dims.getNumCols(), in1dims.getRowsInBlock(), in1dims.getColsInBlock(), 
in1dims.getNnz(),
+                                       in2dims.getNumRows(), 
in2dims.getNumCols(), in2dims.getRowsInBlock(), in2dims.getColsInBlock(), 
in2dims.getNnz(),
+                                       dcInputIndex, false);
+               }
+               else if ( node instanceof PMMJ ) {
+                       int dcInputIndex = node.distributedCacheInputIndex()[0];
+                       footprint = AggBinaryOp.getMapmmMemEstimate(
+                                       in1dims.getNumRows(), 1, 
in1dims.getRowsInBlock(), in1dims.getColsInBlock(), in1dims.getNnz(),
+                                       in2dims.getNumRows(), 
in2dims.getNumCols(), in2dims.getRowsInBlock(), in2dims.getColsInBlock(), 
in2dims.getNnz(), 
+                                       dcInputIndex, true);
+               }
+               else if ( node instanceof AppendM ) {
+                       footprint = BinaryOp.footprintInMapper(
+                                       in1dims.getNumRows(), 
in1dims.getNumCols(), 
+                                       in2dims.getNumRows(), 
in2dims.getNumCols(), 
+                                       in1dims.getRowsInBlock(), 
in1dims.getColsInBlock());
+               }
+               else if ( node instanceof BinaryM ) {
+                       footprint = BinaryOp.footprintInMapper(
+                                       in1dims.getNumRows(), 
in1dims.getNumCols(), 
+                                       in2dims.getNumRows(), 
in2dims.getNumCols(), 
+                                       in1dims.getRowsInBlock(), 
in1dims.getColsInBlock());
+               }
+               else {
+                       // default behavior
+                       return 0.0;
+               }
+               return footprint;
+       }
+       
+       /**
+        * Determines if <code>node</code> can be executed in current round of 
MR jobs or if it needs to be queued for later rounds.
+        * If the total estimated footprint (<code>node</code> and previously 
added nodes in GMR) is less than available memory on 
+        * the mappers then <code>node</code> can be executed in current round, 
and <code>true</code> is returned. Otherwise, 
+        * <code>node</code> must be queued and <code>false</code> is returned. 
+        * 
+        * @param node low-level operator
+        * @param footprintInMapper mapper footprint
+        * @return true if node can be executed in current round of jobs
+        */
+       private static boolean checkMemoryLimits(Lop node, double 
footprintInMapper) {
+               boolean addNode = true;
+               
+               // Memory limits must be checked only for nodes that use 
distributed cache
+               if ( ! node.usesDistributedCache() )
+                       // default behavior
+                       return addNode;
+               
+               double memBudget = Math.min(AggBinaryOp.MAPMULT_MEM_MULTIPLIER, 
BinaryOp.APPEND_MEM_MULTIPLIER) * OptimizerUtils.getRemoteMemBudgetMap(true);
+               if ( footprintInMapper <= memBudget ) 
+                       return addNode;
+               else
+                       return !addNode;
        }
 
-       private boolean compatibleWithChildrenInExecNodes(ArrayList<Lop> 
execNodes, Lop node) {
+       private boolean compatibleWithChildrenInExecNodes(List<Lop> execNodes, 
Lop node) {
          for( Lop tmpNode : execNodes ) {
            // for lops that execute in control program, compatibleJobs 
property is set to LopProperties.INVALID
            // we should not consider such lops in this check
@@ -1211,7 +1272,7 @@ public class Dag<N extends Lop>
         * @param varName variable name
         * @param deleteInst list of instructions
         */
-       private static void excludeRemoveInstruction(String varName, 
ArrayList<Instruction> deleteInst) {
+       private static void excludeRemoveInstruction(String varName, 
List<Instruction> deleteInst) {
                for(int i=0; i < deleteInst.size(); i++) {
                        Instruction inst = deleteInst.get(i);
                        if ((inst.getType() == IType.CONTROL_PROGRAM  || 
inst.getType() == IType.SPARK)
@@ -1229,7 +1290,7 @@ public class Dag<N extends Lop>
         * @param inst list of instructions
         * @param delteInst list of instructions
         */
-       private static void processConsumersForInputs(Lop node, 
ArrayList<Instruction> inst, ArrayList<Instruction> delteInst) {
+       private static void processConsumersForInputs(Lop node, 
List<Instruction> inst, List<Instruction> delteInst) {
                // reduce the consumer count for all input lops
                // if the count becomes zero, then then variable associated w/ 
input can be removed
                for(Lop in : node.getInputs() ) {
@@ -1242,7 +1303,7 @@ public class Dag<N extends Lop>
                }
        }
        
-       private static void processConsumers(Lop node, ArrayList<Instruction> 
inst, ArrayList<Instruction> deleteInst, Lop locationInfo) {
+       private static void processConsumers(Lop node, List<Instruction> inst, 
List<Instruction> deleteInst, Lop locationInfo) {
                // reduce the consumer count for all input lops
                // if the count becomes zero, then then variable associated w/ 
input can be removed
                if ( node.removeConsumer() == 0 ) {
@@ -1272,8 +1333,8 @@ public class Dag<N extends Lop>
         * @param writeInst list of write instructions
         * @param deleteInst list of delete instructions
         */
-       private void generateControlProgramJobs(ArrayList<Lop> execNodes,
-                       ArrayList<Instruction> inst, ArrayList<Instruction> 
writeInst, ArrayList<Instruction> deleteInst) {
+       private void generateControlProgramJobs(List<Lop> execNodes,
+               List<Instruction> inst, List<Instruction> writeInst, 
List<Instruction> deleteInst) {
 
                // nodes to be deleted from execnodes
                ArrayList<Lop> markedNodes = new ArrayList<>();
@@ -1549,9 +1610,8 @@ public class Dag<N extends Lop>
         * @param queuedNodes list of queued nodes
         * @param jobvec list of lists of low-level operators
         */
-       private void removeNodesForNextIteration(Lop node, ArrayList<Lop> 
finishedNodes,
-                       ArrayList<Lop> execNodes, ArrayList<Lop> queuedNodes,
-                       ArrayList<ArrayList<Lop>> jobvec) {
+       private void removeNodesForNextIteration(Lop node, List<Lop> 
finishedNodes,
+               List<Lop> execNodes, List<Lop> queuedNodes, List<List<Lop>> 
jobvec) {
                
                // only queued nodes with multiple inputs need to be handled.
                if (node.getInputs().size() == 1)
@@ -1572,7 +1632,7 @@ public class Dag<N extends Lop>
                        LOG.trace("  Before remove nodes for next iteration -- 
size of execNodes " + execNodes.size());
 
                // Determine if <code>node</code> has inputs from the same job 
or multiple jobs
-           int jobid = Integer.MIN_VALUE;
+               int jobid = Integer.MIN_VALUE;
                boolean inputs_in_same_job = true;
                for( Lop input : node.getInputs() ) {
                        int input_jobid = jobType(input, jobvec);
@@ -1664,7 +1724,6 @@ public class Dag<N extends Lop>
                                        if(queueit) {
                                                if( LOG.isTraceEnabled() )
                                                        LOG.trace("    Removing 
for next iteration (code " + code + "): (" + tmpNode.getID() + ") " + 
tmpNode.toString());
-                                               
                                                markedNodes.add(tmpNode);
                                        }
                                }
@@ -1723,23 +1782,19 @@ public class Dag<N extends Lop>
                }
        }
 
-       private boolean branchCanBePiggyBackedReduce(Lop tmpNode, Lop node, 
ArrayList<Lop> execNodes, ArrayList<Lop> queuedNodes) {
+       private boolean branchCanBePiggyBackedReduce(Lop tmpNode, Lop node, 
List<Lop> execNodes, List<Lop> queuedNodes) {
                if(node.getExecLocation() != ExecLocation.Reduce)
                        return false;
-           
                // if tmpNode is descendant of any queued child of node, then 
branch can not be piggybacked
                for(Lop ni : node.getInputs()) {
                        if(queuedNodes.contains(ni) && isChild(tmpNode, ni, 
IDMap))
                                return false;
                }
-               
                for( Lop n : execNodes ) {
                   if(n.equals(node))
                           continue;
-       
                   if(n.equals(tmpNode) && n.getExecLocation() != 
ExecLocation.Map && n.getExecLocation() != ExecLocation.MapOrReduce)
                           return false;
-       
                   // check if n is on the branch tmpNode->*->node
                   if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap)) {
                           if(!node.getInputs().contains(tmpNode) // redundant
@@ -1750,7 +1805,7 @@ public class Dag<N extends Lop>
           return true;
        }
 
-       private boolean branchCanBePiggyBackedMap(Lop tmpNode, Lop node, 
ArrayList<Lop> execNodes, ArrayList<Lop> queuedNodes, ArrayList<Lop> 
markedNodes) {
+       private boolean branchCanBePiggyBackedMap(Lop tmpNode, Lop node, 
List<Lop> execNodes, List<Lop> queuedNodes, List<Lop> markedNodes) {
                if(node.getExecLocation() != ExecLocation.Map)
                        return false;
                
@@ -1811,17 +1866,13 @@ public class Dag<N extends Lop>
         * @param queuedNodes list of queued nodes
         * @return true if tmpNode can be piggbacked on node
         */
-       private boolean branchCanBePiggyBackedMapAndReduce(Lop tmpNode, Lop 
node,
-                       ArrayList<Lop> execNodes, ArrayList<Lop> queuedNodes) {
-
+       private boolean branchCanBePiggyBackedMapAndReduce(Lop tmpNode, Lop 
node, List<Lop> execNodes, List<Lop> queuedNodes) {
                if (node.getExecLocation() != ExecLocation.MapAndReduce)
                        return false;
                JobType jt = JobType.findJobTypeFromLop(node);
-
                for ( Lop n : execNodes ) {
                        if (n.equals(node))
                                continue;
-
                        // Evaluate only nodes on the branch between 
tmpNode->..->node
                        if (n.equals(tmpNode) || (isChild(n, node, IDMap) && 
isChild(tmpNode, n, IDMap))) {
                                if ( hasOtherMapAndReduceParentNode(tmpNode, 
queuedNodes,node) )
@@ -1836,8 +1887,8 @@ public class Dag<N extends Lop>
                return true;
        }
 
-  private boolean branchHasNoOtherUnExecutedParents(Lop tmpNode, Lop node,
-      ArrayList<Lop> execNodes, ArrayList<Lop> finishedNodes) {
+       private boolean branchHasNoOtherUnExecutedParents(Lop tmpNode, Lop node,
+               List<Lop> execNodes, List<Lop> finishedNodes) {
 
          //if tmpNode has more than one unfinished output, return false 
          if(tmpNode.getOutputs().size() > 1)
@@ -1860,7 +1911,7 @@ public class Dag<N extends Lop>
            {      
              int cnt = 0;
              for (Lop output : n.getOutputs() ) {
-               if (!finishedNodes.contains(output))    
+               if (!finishedNodes.contains(output))
                  cnt++;
              } 
              
@@ -1879,7 +1930,7 @@ public class Dag<N extends Lop>
         * @param jobvec list of lists of low-level operators
         * @return job index for a low-level operator
         */
-       private static int jobType(Lop lops, ArrayList<ArrayList<Lop>> jobvec) {
+       private static int jobType(Lop lops, List<List<Lop>> jobvec) {
                for ( JobType jt : JobType.values()) {
                        int i = jt.getId();
                        if (i > 0 && jobvec.get(i) != null && 
jobvec.get(i).contains(lops)) {
@@ -1898,12 +1949,9 @@ public class Dag<N extends Lop>
         * @param node low-level operator
         * @return true if MapAndReduce node between tmpNode and node in 
nodeList
         */
-       private boolean hasOtherMapAndReduceParentNode(Lop tmpNode,
-                       ArrayList<Lop> nodeList, Lop node) {
-               
+       private boolean hasOtherMapAndReduceParentNode(Lop tmpNode, List<Lop> 
nodeList, Lop node) {
                if ( tmpNode.getExecLocation() == ExecLocation.MapAndReduce)
                        return true;
-               
                for ( Lop n : tmpNode.getOutputs() ) {
                        if ( nodeList.contains(n) && isChild(n,node,IDMap)) {
                                if(!n.equals(node) && n.getExecLocation() == 
ExecLocation.MapAndReduce)
@@ -1912,7 +1960,6 @@ public class Dag<N extends Lop>
                                        return 
hasOtherMapAndReduceParentNode(n, nodeList, node);
                        }
                }
-
                return false;
        }
 
@@ -1924,7 +1971,7 @@ public class Dag<N extends Lop>
         * @param node low-level operator
         * @return true if there is a queued node that is a parent of tmpNode 
and node
         */
-       private boolean hasOtherQueuedParentNode(Lop tmpNode, ArrayList<Lop> 
queuedNodes, Lop node) {
+       private boolean hasOtherQueuedParentNode(Lop tmpNode, List<Lop> 
queuedNodes, Lop node) {
                if ( queuedNodes.isEmpty() )
                        return false;
                
@@ -1947,22 +1994,20 @@ public class Dag<N extends Lop>
         * 
         * @param jobNodes list of lists of low-level operators
         */
-       private static void printJobNodes(ArrayList<ArrayList<Lop>> jobNodes)
-       {
-               if (LOG.isTraceEnabled()){
-                       for ( JobType jt : JobType.values() ) {
-                               int i = jt.getId();
-                               if (i > 0 && jobNodes.get(i) != null && 
!jobNodes.get(i).isEmpty() ) {
-                                       LOG.trace(jt.getName() + " Job Nodes:");
-                                       
-                                       for (int j = 0; j < 
jobNodes.get(i).size(); j++) {
-                                               LOG.trace("    "
-                                                               + 
jobNodes.get(i).get(j).getID() + ") "
-                                                               + 
jobNodes.get(i).get(j).toString());
-                                       }
+       private static void printJobNodes(List<List<Lop>> jobNodes) {
+               if( !LOG.isTraceEnabled() )
+                       return;
+               for ( JobType jt : JobType.values() ) {
+                       int i = jt.getId();
+                       if (i > 0 && jobNodes.get(i) != null && 
!jobNodes.get(i).isEmpty() ) {
+                               LOG.trace(jt.getName() + " Job Nodes:");
+                               
+                               for (int j = 0; j < jobNodes.get(i).size(); 
j++) {
+                                       LOG.trace("    "
+                                                       + 
jobNodes.get(i).get(j).getID() + ") "
+                                                       + 
jobNodes.get(i).get(j).toString());
                                }
                        }
-                       
                }
        }
 
@@ -1973,7 +2018,7 @@ public class Dag<N extends Lop>
         * @param loc exec location
         * @return true if there is a node with RecordReader exec location
         */
-       private static boolean hasANode(ArrayList<Lop> nodes, ExecLocation loc) 
{
+       private static boolean hasANode(List<Lop> nodes, ExecLocation loc) {
                for ( Lop n : nodes ) {
                        if (n.getExecLocation() == ExecLocation.RecordReader)
                                return true;
@@ -1981,7 +2026,7 @@ public class Dag<N extends Lop>
                return false;
        }
 
-       private ArrayList<ArrayList<Lop>> 
splitGMRNodesByRecordReader(ArrayList<Lop> gmrnodes) 
+       private List<List<Lop>> splitGMRNodesByRecordReader(List<Lop> gmrnodes) 
        {
                // obtain the list of record reader nodes
                ArrayList<Lop> rrnodes = new ArrayList<>();
@@ -1992,7 +2037,7 @@ public class Dag<N extends Lop>
 
                // We allocate one extra vector to hold lops that do not depend 
on any
                // recordreader lops
-               ArrayList<ArrayList<Lop>> splitGMR = 
createNodeVectors(rrnodes.size() + 1);
+               List<List<Lop>> splitGMR = createNodeVectors(rrnodes.size() + 
1);
 
                // flags to indicate whether a lop has been added to one of the 
node vectors
                boolean[] flags = new boolean[gmrnodes.size()];
@@ -2039,10 +2084,8 @@ public class Dag<N extends Lop>
         * @param deleteinst list of delete instructions
         * @param jobNodes list of list of low-level operators
         */
-       private void generateMRJobs(ArrayList<Lop> execNodes,
-                       ArrayList<Instruction> inst,
-                       ArrayList<Instruction> writeinst,
-                       ArrayList<Instruction> deleteinst, 
ArrayList<ArrayList<Lop>> jobNodes)
+       private void generateMRJobs(List<Lop> execNodes, List<Instruction> inst,
+               List<Instruction> writeinst, List<Instruction> deleteinst, 
List<List<Lop>> jobNodes)
        {
                printJobNodes(jobNodes);
                
@@ -2054,7 +2097,7 @@ public class Dag<N extends Lop>
                                continue;
                        
                        int index = jt.getId(); // job id is used as an index 
into jobNodes
-                       ArrayList<Lop> currNodes = jobNodes.get(index);
+                       List<Lop> currNodes = jobNodes.get(index);
                        
                        // generate MR job
                        if (currNodes != null && !currNodes.isEmpty() ) {
@@ -2064,7 +2107,7 @@ public class Dag<N extends Lop>
 
                                if (jt.allowsRecordReaderInstructions() && 
hasANode(jobNodes.get(index), ExecLocation.RecordReader)) {
                                        // split the nodes by recordReader lops
-                                       ArrayList<ArrayList<Lop>> rrlist = 
splitGMRNodesByRecordReader(jobNodes.get(index));
+                                       List<List<Lop>> rrlist = 
splitGMRNodesByRecordReader(jobNodes.get(index));
                                        for (int i = 0; i < rrlist.size(); i++) 
{
                                                
generateMapReduceInstructions(rrlist.get(i), inst, writeinst, deleteinst, 
rmvarinst, jt);
                                        }
@@ -2120,7 +2163,7 @@ public class Dag<N extends Lop>
         * @param node_v list of nodes
         * @param exec_n list of nodes
         */
-       private void addParents(Lop node, ArrayList<Lop> node_v, ArrayList<Lop> 
exec_n) {
+       private void addParents(Lop node, List<Lop> node_v, List<Lop> exec_n) {
                for (Lop enode : exec_n ) {
                        if (isChild(node, enode, IDMap)) {
                                if (!node_v.contains(enode)) {
@@ -2139,7 +2182,7 @@ public class Dag<N extends Lop>
         * @param node_v list of nodes
         * @param exec_n list of nodes
         */
-       private static void addChildren(Lop node, ArrayList<Lop> node_v, 
ArrayList<Lop> exec_n) {
+       private static void addChildren(Lop node, List<Lop> node_v, List<Lop> 
exec_n) {
 
                // add child in exec nodes that is not of type scalar
                if (exec_n.contains(node)
@@ -2624,9 +2667,8 @@ public class Dag<N extends Lop>
         * @param rmvarinst list of rmvar instructions
         * @param jt job type
         */
-       private void generateMapReduceInstructions(ArrayList<Lop> execNodes,
-                       ArrayList<Instruction> inst, ArrayList<Instruction> 
writeinst, ArrayList<Instruction> deleteinst, ArrayList<Instruction> rmvarinst, 
-                       JobType jt)
+       private void generateMapReduceInstructions(List<Lop> execNodes, 
List<Instruction> inst,
+               List<Instruction> writeinst, List<Instruction> deleteinst, 
List<Instruction> rmvarinst, JobType jt)
        {
                ArrayList<Byte> resultIndices = new ArrayList<>();
                ArrayList<String> inputs = new ArrayList<>();
@@ -2875,7 +2917,7 @@ public class Dag<N extends Lop>
         * @param inputStrings list of input strings
         * @return Lop.INSTRUCTION_DELIMITOR separated string
         */
-       private static String getCSVString(ArrayList<String> inputStrings) {
+       private static String getCSVString(List<String> inputStrings) {
                StringBuilder sb = new StringBuilder();
                for ( String str : inputStrings ) {
                        if( str != null ) {
@@ -2902,13 +2944,9 @@ public class Dag<N extends Lop>
         * @param MRJobLineNumbers MR job line numbers
         * @return -1 if problem
         */
-       private int getAggAndOtherInstructions(Lop node, ArrayList<Lop> 
execNodes,
-                       ArrayList<String> shuffleInstructions,
-                       ArrayList<String> aggInstructionsReducer,
-                       ArrayList<String> otherInstructionsReducer,
-                       HashMap<Lop, Integer> nodeIndexMapping, int[] 
start_index,
-                       ArrayList<String> inputLabels, ArrayList<Lop> 
inputLops, 
-                       ArrayList<Integer> MRJobLineNumbers)
+       private int getAggAndOtherInstructions(Lop node, List<Lop> execNodes, 
List<String> shuffleInstructions,
+               List<String> aggInstructionsReducer, List<String> 
otherInstructionsReducer, Map<Lop, Integer> nodeIndexMapping,
+               int[] start_index,List<String> inputLabels, List<Lop> 
inputLops, List<Integer> MRJobLineNumbers)
        {
                int ret_val = -1;
 
@@ -2916,7 +2954,6 @@ public class Dag<N extends Lop>
                        return nodeIndexMapping.get(node);
 
                // if not an input source and not in exec nodes, return.
-
                if (!execNodes.contains(node))
                        return ret_val;
 
@@ -3158,12 +3195,9 @@ public class Dag<N extends Lop>
         * @param MRJobLineNumbers MR job line numbers
         * @return -1 if problem
         */
-       private static int getRecordReaderInstructions(Lop node, ArrayList<Lop> 
execNodes,
-                       ArrayList<String> inputStrings,
-                       ArrayList<String> recordReaderInstructions,
-                       HashMap<Lop, Integer> nodeIndexMapping, int[] 
start_index,
-                       ArrayList<String> inputLabels, ArrayList<Lop> inputLops,
-                       ArrayList<Integer> MRJobLineNumbers)
+       private static int getRecordReaderInstructions(Lop node, List<Lop> 
execNodes, List<String> inputStrings,
+               List<String> recordReaderInstructions, Map<Lop, Integer> 
nodeIndexMapping, int[] start_index,
+               List<String> inputLabels, List<Lop> inputLops, List<Integer> 
MRJobLineNumbers)
        {
                // if input source, return index
                if (nodeIndexMapping.containsKey(node))
@@ -3258,12 +3292,9 @@ public class Dag<N extends Lop>
         * @param MRJoblineNumbers MR job line numbers
         * @return -1 if problem
         */
-       private int getMapperInstructions(Lop node, ArrayList<Lop> execNodes,
-                       ArrayList<String> inputStrings,
-                       ArrayList<String> instructionsInMapper,
-                       HashMap<Lop, Integer> nodeIndexMapping, int[] 
start_index,
-                       ArrayList<String> inputLabels, ArrayList<Lop> 
inputLops, 
-                       ArrayList<Integer> MRJobLineNumbers)
+       private int getMapperInstructions(Lop node, List<Lop> execNodes, 
List<String> inputStrings,
+               List<String> instructionsInMapper, Map<Lop, Integer> 
nodeIndexMapping, int[] start_index,
+               List<String> inputLabels, List<Lop> inputLops, List<Integer> 
MRJobLineNumbers)
        {
                // if input source, return index
                if (nodeIndexMapping.containsKey(node))
@@ -3393,12 +3424,10 @@ public class Dag<N extends Lop>
        }
 
        // Method to populate inputs and also populates node index mapping.
-       private static void getInputPathsAndParameters(Lop node, ArrayList<Lop> 
execNodes,
-                       ArrayList<String> inputStrings, ArrayList<InputInfo> 
inputInfos,
-                       ArrayList<Long> numRows, ArrayList<Long> numCols,
-                       ArrayList<Long> numRowsPerBlock, ArrayList<Long> 
numColsPerBlock,
-                       HashMap<Lop, Integer> nodeIndexMapping, 
ArrayList<String> inputLabels, 
-                       ArrayList<Lop> inputLops, ArrayList<Integer> 
MRJobLineNumbers) {
+       private static void getInputPathsAndParameters(Lop node, List<Lop> 
execNodes,
+                       List<String> inputStrings, List<InputInfo> inputInfos, 
List<Long> numRows, List<Long> numCols,
+                       List<Long> numRowsPerBlock, List<Long> numColsPerBlock, 
Map<Lop, Integer> nodeIndexMapping,
+                       List<String> inputLabels, List<Lop> inputLops, 
List<Integer> MRJobLineNumbers) {
                // treat rand as an input.
                if (node.getType() == Type.DataGen && execNodes.contains(node)
                                && !nodeIndexMapping.containsKey(node)) {
@@ -3514,7 +3543,7 @@ public class Dag<N extends Lop>
         * @param rootNodes list of root nodes
         * @param jt job type
         */
-       private static void getOutputNodes(ArrayList<Lop> execNodes, 
ArrayList<Lop> rootNodes, JobType jt) {
+       private static void getOutputNodes(List<Lop> execNodes, List<Lop> 
rootNodes, JobType jt) {
                for ( Lop node : execNodes ) {
                        // terminal node
                        if (node.getOutputs().isEmpty() && 
!rootNodes.contains(node)) {
@@ -3554,93 +3583,10 @@ public class Dag<N extends Lop>
         * @param IDMap id map
         * @return true if a child of b
         */
-       private static boolean isChild(Lop a, Lop b, HashMap<Long, Integer> 
IDMap) {
+       private static boolean isChild(Lop a, Lop b, Map<Long, Integer> IDMap) {
                int bID = IDMap.get(b.getID());
                return a.get_reachable()[bID];
        }
-       
-       /**
-        * Sort the lops by topological order.
-        * 
-        *  1) All nodes with level i appear prior to the nodes in level i+1.
-        *  2) All nodes within a level are ordered by their ID i.e., in the 
order
-        *  they are created
-        * 
-        * @param v list of lops
-        */
-       @SuppressWarnings({ "unchecked", "rawtypes" })
-       private ArrayList<Lop> doTopologicalSortStrictOrder(ArrayList<Lop> v) {
-               /*
-                * Step 1: compute the level for each node in the DAG. Level 
for each node is 
-                *   computed as lops are created. So, this step is need not be 
performed here.
-                * Step 2: sort the nodes by level, and within a level by node 
ID.
-                */
-               
-               // Step1: Performed at the time of creating Lops
-               
-               // Step2: sort nodes by level, and then by node ID
-               Lop[] nodearray = v.toArray(new Lop[0]);
-               Arrays.sort(nodearray, new LopComparator());
-
-               return createIDMapping(nodearray);
-       }
-       
-       private ArrayList<Lop> doTopologicalSortTwoLevelOrder(ArrayList<Lop> v) 
{
-               //partition nodes into leaf/inner nodes and dag root nodes,
-               //+ sort leaf/inner nodes by ID to force depth-first scheduling
-               //+ append root nodes in order of their original definition 
-               //  (which also preserves the original order of prints)
-               Lop[] nodearray = Stream.concat(
-                       v.stream().filter(l -> 
!l.getOutputs().isEmpty()).sorted(Comparator.comparing(l -> l.getID())),
-                       v.stream().filter(l -> l.getOutputs().isEmpty()))
-                       .toArray(Lop[]::new);
-               
-               return createIDMapping(nodearray);
-       }
-       
-       private ArrayList<Lop> createIDMapping(Lop[] nodearray) {
-               // Copy sorted nodes into "v" and construct a mapping between 
Lop IDs and sequence of numbers
-               ArrayList<Lop> ret = new ArrayList<>();
-               IDMap.clear();
-               
-               for (int i = 0; i < nodearray.length; i++) {
-                       ret.add(nodearray[i]);
-                       IDMap.put(nodearray[i].getID(), i);
-               }
-               
-               /*
-                * Compute of All-pair reachability graph (Transitive Closure) 
of the DAG.
-                * - Perform a depth-first search (DFS) from every node $u$ in 
the DAG 
-                * - and construct the list of reachable nodes from the node $u$
-                * - store the constructed reachability information in 
$u$.reachable[] boolean array
-                */
-               for (int i = 0; i < nodearray.length; i++) {
-                       dagDFS(nodearray[i], nodearray[i]
-                               .create_reachable(nodearray.length));
-               }
-
-               // print the nodes in sorted order
-               if (LOG.isTraceEnabled()) {
-                       for ( Lop vnode : ret ) {
-                               StringBuilder sb = new StringBuilder();
-                               sb.append(vnode.getID());
-                               sb.append("(");
-                               sb.append(vnode.getLevel());
-                               sb.append(") ");
-                               sb.append(vnode.getType());
-                               sb.append("(");
-                               for(Lop vin : vnode.getInputs()) {
-                                       sb.append(vin.getID());
-                                       sb.append(",");
-                               }
-                               sb.append("), ");
-                               LOG.trace(sb.toString());
-                       }
-                       LOG.trace("topological sort -- done");
-               }
-               
-               return ret;
-       }
 
        /**
         * Method to perform depth-first traversal from a given node in the DAG.
@@ -3663,7 +3609,7 @@ public class Dag<N extends Lop>
                }
        }
        
-       private static boolean hasDirectChildNode(Lop node, ArrayList<Lop> 
childNodes) {
+       private static boolean hasDirectChildNode(Lop node, List<Lop> 
childNodes) {
                if ( childNodes.isEmpty() ) 
                        return false;
                for( Lop cnode : childNodes ) {
@@ -3673,11 +3619,11 @@ public class Dag<N extends Lop>
                return false;
        }
        
-       private boolean hasChildNode(Lop node, ArrayList<Lop> nodes) {
+       private boolean hasChildNode(Lop node, List<Lop> nodes) {
                return hasChildNode(node, nodes, ExecLocation.INVALID);
        }
 
-       private boolean hasChildNode(Lop node, ArrayList<Lop> childNodes, 
ExecLocation type) {
+       private boolean hasChildNode(Lop node, List<Lop> childNodes, 
ExecLocation type) {
                if ( childNodes.isEmpty() ) 
                        return false;
                int index = IDMap.get(node.getID());
@@ -3688,7 +3634,7 @@ public class Dag<N extends Lop>
                return false;
        }
        
-       private Lop getChildNode(Lop node, ArrayList<Lop> childNodes, 
ExecLocation type) {
+       private Lop getChildNode(Lop node, List<Lop> childNodes, ExecLocation 
type) {
                if ( childNodes.isEmpty() )
                        return null;
                int index = IDMap.get(node.getID());
@@ -3708,7 +3654,7 @@ public class Dag<N extends Lop>
         * Returns null if no such "n" exists
         * 
         */
-       private Lop getParentNode(Lop node, ArrayList<Lop> parentNodes, 
ExecLocation type) {
+       private Lop getParentNode(Lop node, List<Lop> parentNodes, ExecLocation 
type) {
                if ( parentNodes.isEmpty() )
                        return null;
                for( Lop pn : parentNodes ) {
@@ -3721,7 +3667,7 @@ public class Dag<N extends Lop>
 
        // Checks if "node" has any descendants in nodesVec with definedMRJob 
flag
        // set to true
-       private boolean hasMRJobChildNode(Lop node, ArrayList<Lop> nodesVec) {
+       private boolean hasMRJobChildNode(Lop node, List<Lop> nodesVec) {
                if ( nodesVec.isEmpty() )
                        return false;
                
@@ -3733,7 +3679,7 @@ public class Dag<N extends Lop>
                return false;
        }
 
-       private boolean checkDataGenAsChildNode(Lop node, ArrayList<Lop> 
nodesVec) {
+       private boolean checkDataGenAsChildNode(Lop node, List<Lop> nodesVec) {
                if( nodesVec.isEmpty() )
                        return true;
                
@@ -3747,7 +3693,7 @@ public class Dag<N extends Lop>
                return onlyDatagen;
        }
 
-       private static int getChildAlignment(Lop node, ArrayList<Lop> 
execNodes, ExecLocation type) 
+       private static int getChildAlignment(Lop node, List<Lop> execNodes, 
ExecLocation type) 
        {
                for (Lop n : node.getInputs() ) {
                        if (!execNodes.contains(n))
@@ -3780,7 +3726,7 @@ public class Dag<N extends Lop>
                return MRCHILD_NOT_FOUND;
        }
 
-       private boolean hasParentNode(Lop node, ArrayList<Lop> parentNodes) {
+       private boolean hasParentNode(Lop node, List<Lop> parentNodes) {
                if ( parentNodes.isEmpty() )
                        return false;           
                for( Lop pnode : parentNodes ) {
@@ -3798,9 +3744,9 @@ public class Dag<N extends Lop>
         * @param insts list of instructions
         * @return new list of potentially modified instructions
         */
-       private static ArrayList<Instruction> 
cleanupInstructions(ArrayList<Instruction> insts) {
+       private static ArrayList<Instruction> 
cleanupInstructions(List<Instruction> insts) {
                //step 1: create mvvar instructions: assignvar s1 s2, rmvar s1 
-> mvvar s1 s2
-               ArrayList<Instruction> tmp1 = collapseAssignvarAn

<TRUNCATED>

Reply via email to