Repository: systemml Updated Branches: refs/heads/master e270960ca -> 55ce4853c
http://git-wip-us.apache.org/repos/asf/systemml/blob/55ce4853/src/main/java/org/apache/sysml/lops/compile/Dag.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/compile/Dag.java b/src/main/java/org/apache/sysml/lops/compile/Dag.java index bc080d3..791fa01 100644 --- a/src/main/java/org/apache/sysml/lops/compile/Dag.java +++ b/src/main/java/org/apache/sysml/lops/compile/Dag.java @@ -21,10 +21,15 @@ package org.apache.sysml.lops.compile; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.commons.logging.Log; @@ -105,26 +110,21 @@ public class Dag<N extends Lop> private int total_reducers = -1; private String scratch = ""; private String scratchFilePath = null; + + // list of all nodes in the dag + private ArrayList<Lop> nodes = null; + // Hashmap to translates the nodes in the DAG to a sequence of numbers + // key: Lop ID + // value: Sequence Number (0 ... |DAG|) + // This map is primarily used in performing DFS on the DAG, and subsequently in performing ancestor-descendant checks. + private HashMap<Long, Integer> IDMap = null; private double gmrMapperFootprint = 0; - + static { job_id = new IDSequence(); var_index = new IDSequence(); } - - // list of all nodes in the dag - private ArrayList<Lop> nodes = null; - - /* - * Hashmap to translates the nodes in the DAG to a sequence of numbers - * key: Lop ID - * value: Sequence Number (0 ... |DAG|) - * - * This map is primarily used in performing DFS on the DAG, and subsequently in performing ancestor-descendant checks. - */ - private HashMap<Long, Integer> IDMap = null; - private static class NodeOutput { String fileName; @@ -196,9 +196,9 @@ public class Dag<N extends Lop> private String getFilePath() { if ( scratchFilePath == null ) { scratchFilePath = scratch + Lop.FILE_SEPARATOR - + Lop.PROCESS_PREFIX + DMLScript.getUUID() - + Lop.FILE_SEPARATOR + Lop.FILE_SEPARATOR - + ProgramConverter.CP_ROOT_THREAD_ID + Lop.FILE_SEPARATOR; + + Lop.PROCESS_PREFIX + DMLScript.getUUID() + + Lop.FILE_SEPARATOR + Lop.FILE_SEPARATOR + + ProgramConverter.CP_ROOT_THREAD_ID + Lop.FILE_SEPARATOR; } return scratchFilePath; } @@ -226,10 +226,9 @@ public class Dag<N extends Lop> * @param node low-level operator * @return true if node was not already present, false if not. */ - public boolean addNode(Lop node) { if (nodes.contains(node)) - return false; + return false; nodes.add(node); return true; } @@ -249,696 +248,282 @@ public class Dag<N extends Lop> // create ordering of lops (for MR, we sort by level, while for all // other exec types we use a two-level sorting of ) - ArrayList<Lop> node_v = OptimizerUtils.isHadoopExecutionMode() ? + List<Lop> node_v = OptimizerUtils.isHadoopExecutionMode() ? doTopologicalSortStrictOrder(nodes) : doTopologicalSortTwoLevelOrder(nodes); // do greedy grouping of operations - ArrayList<Instruction> inst = doGreedyGrouping(sb, node_v); + ArrayList<Instruction> inst = OptimizerUtils.isHadoopExecutionMode() ? + doGreedyGrouping(sb, node_v) : doPlainInstructionGen(sb, node_v); // cleanup instruction (e.g., create packed rmvar instructions) - inst = cleanupInstructions(inst); + return cleanupInstructions(inst); + } + + + /** + * Sort the lops by topological order. + * + * 1) All nodes with level i appear prior to the nodes in level i+1. + * 2) All nodes within a level are ordered by their ID i.e., in the order + * they are created + * + * @param v list of lops + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + private List<Lop> doTopologicalSortStrictOrder(List<Lop> v) { + /* + * Step 1: compute the level for each node in the DAG. Level for each node is + * computed as lops are created. So, this step is need not be performed here. + * Step 2: sort the nodes by level, and within a level by node ID. + */ - return inst; + // Step1: Performed at the time of creating Lops + + // Step2: sort nodes by level, and then by node ID + Lop[] nodearray = v.toArray(new Lop[0]); + Arrays.sort(nodearray, new LopComparator()); + return createIDMapping(nodearray); } + + private List<Lop> doTopologicalSortTwoLevelOrder(List<Lop> v) { + //partition nodes into leaf/inner nodes and dag root nodes, + //+ sort leaf/inner nodes by ID to force depth-first scheduling + //+ append root nodes in order of their original definition + // (which also preserves the original order of prints) + List<Lop> nodes = Stream.concat( + v.stream().filter(l -> !l.getOutputs().isEmpty()).sorted(Comparator.comparing(l -> l.getID())), + v.stream().filter(l -> l.getOutputs().isEmpty())).collect(Collectors.toList()); - private static void deleteUpdatedTransientReadVariables(StatementBlock sb, ArrayList<Lop> nodeV, ArrayList<Instruction> inst) { - if ( sb == null ) - return; - - if( LOG.isTraceEnabled() ) - LOG.trace("In delete updated variables"); - - // CANDIDATE list of variables which could have been updated in this statement block - HashMap<String, Lop> labelNodeMapping = new HashMap<>(); - - // ACTUAL list of variables whose value is updated, AND the old value of the variable - // is no longer accessible/used. - HashSet<String> updatedLabels = new HashSet<>(); - HashMap<String, Lop> updatedLabelsLineNum = new HashMap<>(); + //NOTE: in contrast to hadoop execution modes, we avoid computing the transitive + //closure here to ensure linear time complexity because its unnecessary for CP and Spark + return nodes; + } + + private List<Lop> createIDMapping(Lop[] nodearray) { + // Copy sorted nodes into "v" and construct a mapping between Lop IDs and sequence of numbers + ArrayList<Lop> ret = new ArrayList<>(); + IDMap.clear(); + for (int i = 0; i < nodearray.length; i++) { + ret.add(nodearray[i]); + IDMap.put(nodearray[i].getID(), i); + } - // first capture all transient read variables - for ( Lop node : nodeV ) { + // Compute of All-pair reachability graph (Transitive Closure) of the DAG. + // - Perform a depth-first search (DFS) from every node $u$ in the DAG + // - and construct the list of reachable nodes from the node $u$ + // - store the constructed reachability information in $u$.reachable[] boolean array + for (int i = 0; i < nodearray.length; i++) { + dagDFS(nodearray[i], nodearray[i] + .create_reachable(nodearray.length)); + } - if (node.getExecLocation() == ExecLocation.Data - && ((Data) node).isTransient() - && ((Data) node).getOperationType() == OperationTypes.READ - && ((Data) node).getDataType() == DataType.MATRIX) { - - // "node" is considered as updated ONLY IF the old value is not used any more - // So, make sure that this READ node does not feed into any (transient/persistent) WRITE - boolean hasWriteParent=false; - for(Lop p : node.getOutputs()) { - if(p.getExecLocation() == ExecLocation.Data) { - // if the "p" is of type Data, then it has to be a WRITE - hasWriteParent = true; - break; - } - } - - if ( !hasWriteParent ) { - // node has no parent of type WRITE, so this is a CANDIDATE variable - // add it to labelNodeMapping so that it is considered in further processing - labelNodeMapping.put(node.getOutputParameters().getLabel(), node); + // print the nodes in sorted order + if (LOG.isTraceEnabled()) { + for ( Lop vnode : ret ) { + StringBuilder sb = new StringBuilder(); + sb.append(vnode.getID()); + sb.append("("); + sb.append(vnode.getLevel()); + sb.append(") "); + sb.append(vnode.getType()); + sb.append("("); + for(Lop vin : vnode.getInputs()) { + sb.append(vin.getID()); + sb.append(","); } + sb.append("), "); + LOG.trace(sb.toString()); } + LOG.trace("topological sort -- done"); } + return ret; + } + + + /** + * Method to group a vector of sorted lops. + * + * @param sb statement block + * @param node_v list of low-level operators + * @return list of instructions + */ + private ArrayList<Instruction> doGreedyGrouping(StatementBlock sb, List<Lop> node_v) + { + if( LOG.isTraceEnabled() ) + LOG.trace("Grouping DAG ============"); - // capture updated transient write variables - for ( Lop node : nodeV ) { + // nodes to be executed in current iteration + List<Lop> execNodes = new ArrayList<>(); + // nodes that have already been processed + List<Lop> finishedNodes = new ArrayList<>(); + // nodes that are queued for the following iteration + List<Lop> queuedNodes = new ArrayList<>(); - if (node.getExecLocation() == ExecLocation.Data - && ((Data) node).isTransient() - && ((Data) node).getOperationType() == OperationTypes.WRITE - && ((Data) node).getDataType() == DataType.MATRIX - && labelNodeMapping.containsKey(node.getOutputParameters().getLabel()) // check to make sure corresponding (i.e., with the same label/name) transient read is present - && !labelNodeMapping.containsValue(node.getInputs().get(0)) // check to avoid cases where transient read feeds into a transient write - ) { - updatedLabels.add(node.getOutputParameters().getLabel()); - updatedLabelsLineNum.put(node.getOutputParameters().getLabel(), node); - - } - } + List<List<Lop>> jobNodes = createNodeVectors(JobType.getNumJobTypes()); - // generate RM instructions - Instruction rm_inst = null; - for ( String label : updatedLabels ) - { - rm_inst = VariableCPInstruction.prepareRemoveInstruction(label); - rm_inst.setLocation(updatedLabelsLineNum.get(label)); - - if( LOG.isTraceEnabled() ) - LOG.trace(rm_inst.toString()); - inst.add(rm_inst); - } - - } - - private static void generateRemoveInstructions(StatementBlock sb, ArrayList<Instruction> deleteInst) { + //prepare basic instruction sets + List<Instruction> deleteInst = new ArrayList<>(); + List<Instruction> writeInst = deleteUpdatedTransientReadVariables(sb, node_v); + List<Instruction> endOfBlockInst = generateRemoveInstructions(sb); + ArrayList<Instruction> inst = generateInstructionsForInputVariables(node_v); - if ( sb == null ) - return; + boolean done = false; + String indent = " "; - if( LOG.isTraceEnabled() ) - LOG.trace("In generateRemoveInstructions()"); - - Instruction inst = null; - // RULE 1: if in IN and not in OUT, then there should be an rmvar or rmfilevar inst - // (currently required for specific cases of external functions) - for (String varName : sb.liveIn().getVariableNames()) { - if (!sb.liveOut().containsVariable(varName)) { - inst = VariableCPInstruction.prepareRemoveInstruction(varName); - inst.setLocation(sb.getFilename(), sb.getEndLine(), sb.getEndLine(), -1, -1); - - deleteInst.add(inst); + while (!done) { + if( LOG.isTraceEnabled() ) + LOG.trace("Grouping nodes in DAG"); - if( LOG.isTraceEnabled() ) - LOG.trace(" Adding " + inst.toString()); - } - } - } + execNodes.clear(); + queuedNodes.clear(); + clearNodeVectors(jobNodes); + gmrMapperFootprint=0; - private static ArrayList<ArrayList<Lop>> createNodeVectors(int size) { - ArrayList<ArrayList<Lop>> arr = new ArrayList<>(); + for ( Lop node : node_v ) { + // finished nodes don't need to be processed + if (finishedNodes.contains(node)) + continue; - // for each job type, we need to create a vector. - // additionally, create another vector for execNodes - for (int i = 0; i < size; i++) { - arr.add(new ArrayList<Lop>()); - } - return arr; - } + if( LOG.isTraceEnabled() ) + LOG.trace("Processing node (" + node.getID() + + ") " + node.toString() + " exec nodes size is " + execNodes.size()); + + + //if node defines MR job, make sure it is compatible with all + //its children nodes in execNodes + if(node.definesMRJob() && !compatibleWithChildrenInExecNodes(execNodes, node)) + { + if( LOG.isTraceEnabled() ) + LOG.trace(indent + "Queueing node " + + node.toString() + " (code 1)"); + + queuedNodes.add(node); + removeNodesForNextIteration(node, finishedNodes, execNodes, queuedNodes, jobNodes); + continue; + } - private static void clearNodeVectors(ArrayList<ArrayList<Lop>> arr) { - for (ArrayList<Lop> tmp : arr) { - tmp.clear(); - } - } + // if child is queued, this node will be processed in the later + // iteration + if (hasChildNode(node,queuedNodes)) { - private static boolean isCompatible(ArrayList<Lop> nodes, JobType jt, int from, int to) { - int base = jt.getBase(); - for ( Lop node : nodes ) { - if ((node.getCompatibleJobs() & base) == 0) { - if( LOG.isTraceEnabled() ) - LOG.trace("Not compatible "+ node.toString()); - return false; - } - } - return true; - } + if( LOG.isTraceEnabled() ) + LOG.trace(indent + "Queueing node " + + node.toString() + " (code 2)"); + queuedNodes.add(node); - /** - * Function that determines if the two input nodes can be executed together - * in at least one job. - * - * @param node1 low-level operator 1 - * @param node2 low-level operator 2 - * @return true if nodes can be executed together - */ - private static boolean isCompatible(Lop node1, Lop node2) { - return( (node1.getCompatibleJobs() & node2.getCompatibleJobs()) > 0); - } - - /** - * Function that checks if the given node executes in the job specified by jt. - * - * @param node low-level operator - * @param jt job type - * @return true if node executes in the specified job type - */ - private static boolean isCompatible(Lop node, JobType jt) { - if ( jt == JobType.GMRCELL ) - jt = JobType.GMR; - return ((node.getCompatibleJobs() & jt.getBase()) > 0); - } + // if node has more than two inputs, + // remove children that will be needed in a future + // iterations + // may also have to remove parent nodes of these children + removeNodesForNextIteration(node, finishedNodes, execNodes, + queuedNodes, jobNodes); - /* - * Add node, and its relevant children to job-specific node vectors. - */ - private void addNodeByJobType(Lop node, ArrayList<ArrayList<Lop>> arr, - ArrayList<Lop> execNodes, boolean eliminate) { - - if (!eliminate) { - // Check if this lop defines a MR job. - if ( node.definesMRJob() ) { + continue; + } - // find the corresponding JobType - JobType jt = JobType.findJobTypeFromLop(node); - - if ( jt == null ) { - throw new LopsException(node.printErrorLocation() + "No matching JobType is found for a the lop type: " + node.getType() + " \n"); - } - - // Add "node" to corresponding job vector - - if ( jt == JobType.GMR ) { - if ( node.hasNonBlockedInputs() ) { - int gmrcell_index = JobType.GMRCELL.getId(); - arr.get(gmrcell_index).add(node); - int from = arr.get(gmrcell_index).size(); - addChildren(node, arr.get(gmrcell_index), execNodes); - int to = arr.get(gmrcell_index).size(); - if (!isCompatible(arr.get(gmrcell_index),JobType.GMR, from, to)) // check against GMR only, not against GMRCELL - throw new LopsException(node.printErrorLocation() + "Error during compatibility check \n"); - } - else { - // if "node" (in this case, a group lop) has any inputs from RAND - // then add it to RAND job. Otherwise, create a GMR job - if (hasChildNode(node, arr.get(JobType.DATAGEN.getId()) )) { - arr.get(JobType.DATAGEN.getId()).add(node); - // we should NOT call 'addChildren' because appropriate - // child nodes would have got added to RAND job already - } else { - int gmr_index = JobType.GMR.getId(); - arr.get(gmr_index).add(node); - int from = arr.get(gmr_index).size(); - addChildren(node, arr.get(gmr_index), execNodes); - int to = arr.get(gmr_index).size(); - if (!isCompatible(arr.get(gmr_index),JobType.GMR, from, to)) - throw new LopsException(node.printErrorLocation() + "Error during compatibility check \n"); + // if inputs come from different jobs, then queue + if ( node.getInputs().size() >= 2) { + int jobid = Integer.MIN_VALUE; + boolean queueit = false; + for(int idx=0; idx < node.getInputs().size(); idx++) { + int input_jobid = jobType(node.getInputs().get(idx), jobNodes); + if (input_jobid != -1) { + if ( jobid == Integer.MIN_VALUE ) + jobid = input_jobid; + else if ( jobid != input_jobid ) { + queueit = true; + break; + } } } - } - else { - int index = jt.getId(); - arr.get(index).add(node); - int from = arr.get(index).size(); - addChildren(node, arr.get(index), execNodes); - int to = arr.get(index).size(); - // check if all added nodes are compatible with current job - if (!isCompatible(arr.get(index), jt, from, to)) { - throw new LopsException( - "Unexpected error in addNodeByType."); + if ( queueit ) { + if( LOG.isTraceEnabled() ) + LOG.trace(indent + "Queueing node " + node.toString() + " (code 3)"); + queuedNodes.add(node); + removeNodesForNextIteration(node, finishedNodes, execNodes, queuedNodes, jobNodes); + continue; } } - return; - } - } - - if ( eliminate ) { - // Eliminated lops are directly added to GMR queue. - // Note that eliminate flag is set only for 'group' lops - if ( node.hasNonBlockedInputs() ) - arr.get(JobType.GMRCELL.getId()).add(node); - else - arr.get(JobType.GMR.getId()).add(node); - return; - } - - /* - * If this lop does not define a job, check if it uses the output of any - * specialized job. i.e., if this lop has a child node in any of the - * job-specific vector, then add it to the vector. Note: This lop must - * be added to ONLY ONE of the job-specific vectors. - */ - int numAdded = 0; - for ( JobType j : JobType.values() ) { - if ( j.getId() > 0 && hasDirectChildNode(node, arr.get(j.getId()))) { - if (isCompatible(node, j)) { - arr.get(j.getId()).add(node); - numAdded += 1; + // See if this lop can be eliminated + // This check is for "aligner" lops (e.g., group) + boolean eliminate = false; + eliminate = canEliminateLop(node, execNodes); + if (eliminate) { + if( LOG.isTraceEnabled() ) + LOG.trace(indent + "Adding -"+ node.toString()); + execNodes.add(node); + finishedNodes.add(node); + addNodeByJobType(node, jobNodes, execNodes, eliminate); + continue; } - } - } - if (numAdded > 1) { - throw new LopsException("Unexpected error in addNodeByJobType(): A given lop can ONLY be added to a single job vector (numAdded = " + numAdded + ")." ); - } - } - /* - * Remove the node from all job-specific node vectors. This method is - * invoked from removeNodesForNextIteration(). - */ - private static void removeNodeByJobType(Lop node, ArrayList<ArrayList<Lop>> arr) { - for ( JobType jt : JobType.values()) - if ( jt.getId() > 0 ) - arr.get(jt.getId()).remove(node); - } + // If the node defines a MR Job then make sure none of its + // children that defines a MR Job are present in execNodes + if (node.definesMRJob()) { + if (hasMRJobChildNode(node, execNodes)) { + // "node" must NOT be queued when node=group and the child that defines job is Rand + // this is because "group" can be pushed into the "Rand" job. + if (! (node.getType() == Lop.Type.Grouping && checkDataGenAsChildNode(node,execNodes)) ) { + if( LOG.isTraceEnabled() ) + LOG.trace(indent + "Queueing node " + node.toString() + " (code 4)"); - /** - * As some jobs only write one output, all operations in the mapper need to - * be redone and cannot be marked as finished. - * - * @param execNodes list of exec low-level operators - * @param jobNodes list of job low-level operators - * @param finishedNodes list of finished low-level operators - */ - private void handleSingleOutputJobs(ArrayList<Lop> execNodes, - ArrayList<ArrayList<Lop>> jobNodes, ArrayList<Lop> finishedNodes) - { - /* - * If the input of a MMCJ/MMRJ job (must have executed in a Mapper) is used - * by multiple lops then we should mark it as not-finished. - */ - ArrayList<Lop> nodesWithUnfinishedOutputs = new ArrayList<>(); - int[] jobIndices = {JobType.MMCJ.getId()}; - Lop.Type[] lopTypes = { Lop.Type.MMCJ}; - - // TODO: SortByValue should be treated similar to MMCJ, since it can - // only sort one file now - - for ( int jobi=0; jobi < jobIndices.length; jobi++ ) { - int jindex = jobIndices[jobi]; - if (!jobNodes.get(jindex).isEmpty()) { - ArrayList<Lop> vec = jobNodes.get(jindex); + queuedNodes.add(node); - // first find all nodes with more than one parent that is not finished. - for (int i = 0; i < vec.size(); i++) { - Lop node = vec.get(i); - if (node.getExecLocation() == ExecLocation.MapOrReduce - || node.getExecLocation() == ExecLocation.Map) { - Lop MRparent = getParentNode(node, execNodes, ExecLocation.MapAndReduce); - if ( MRparent != null && MRparent.getType() == lopTypes[jobi]) { - int numParents = node.getOutputs().size(); - if (numParents > 1) { - for (int j = 0; j < numParents; j++) { - if (!finishedNodes.contains(node.getOutputs() - .get(j))) - nodesWithUnfinishedOutputs.add(node); - } - - } + removeNodesForNextIteration(node, finishedNodes, + execNodes, queuedNodes, jobNodes); + + continue; } - } + } } - // need to redo all nodes in nodesWithOutput as well as their children - for ( Lop node : vec ) { - if (node.getExecLocation() == ExecLocation.MapOrReduce - || node.getExecLocation() == ExecLocation.Map) { - if (nodesWithUnfinishedOutputs.contains(node)) - finishedNodes.remove(node); + // if "node" has more than one input, and has a descendant lop + // in execNodes that is of type RecordReader + // then all its inputs must be ancestors of RecordReader. If + // not, queue "node" + if (node.getInputs().size() > 1 + && hasChildNode(node, execNodes, ExecLocation.RecordReader)) { + // get the actual RecordReader lop + Lop rr_node = getChildNode(node, execNodes, ExecLocation.RecordReader); - if (hasParentNode(node, nodesWithUnfinishedOutputs)) - finishedNodes.remove(node); + // all inputs of "node" must be ancestors of rr_node + boolean queue_it = false; + for (Lop n : node.getInputs()) { + // each input should be ancestor of RecordReader lop + if (!n.equals(rr_node) && !isChild(rr_node, n, IDMap)) { + queue_it = true; // i.e., "node" must be queued + break; + } + } + if (queue_it) { + // queue node + if( LOG.isTraceEnabled() ) + LOG.trace(indent + "Queueing -" + node.toString() + " (code 5)"); + queuedNodes.add(node); + // TODO: does this have to be modified to handle + // recordreader lops? + removeNodesForNextIteration(node, finishedNodes, + execNodes, queuedNodes, jobNodes); + continue; + } else { + // nothing here.. subsequent checks have to be performed + // on "node" } } - } - } - - } - - /** - * Method to check if a lop can be eliminated from checking - * - * @param node low-level operator - * @param execNodes list of exec nodes - * @return true if lop can be eliminated - */ - private static boolean canEliminateLop(Lop node, ArrayList<Lop> execNodes) { - // this function can only eliminate "aligner" lops such a group - if (!node.isAligner()) - return false; - // find the child whose execLoc = 'MapAndReduce' - int ret = getChildAlignment(node, execNodes, ExecLocation.MapAndReduce); - - if (ret == CHILD_BREAKS_ALIGNMENT) - return false; - else if (ret == CHILD_DOES_NOT_BREAK_ALIGNMENT) - return true; - else if (ret == MRCHILD_NOT_FOUND) - return false; - else if (ret == MR_CHILD_FOUND_BREAKS_ALIGNMENT) - return false; - else if (ret == MR_CHILD_FOUND_DOES_NOT_BREAK_ALIGNMENT) - return true; - else - throw new RuntimeException("Should not happen. \n"); - } - - - /** - * Method to generate createvar instructions, which creates a new entry - * in the symbol table. One instruction is generated for every LOP that is - * 1) type Data and - * 2) persistent and - * 3) matrix and - * 4) read - * - * Transient reads needn't be considered here since the previous program - * block would already create appropriate entries in the symbol table. - * - * @param nodes_v list of nodes - * @param inst list of instructions - */ - private static void generateInstructionsForInputVariables(ArrayList<Lop> nodes_v, ArrayList<Instruction> inst) { - for(Lop n : nodes_v) { - if (n.getExecLocation() == ExecLocation.Data && !((Data) n).isTransient() - && ((Data) n).getOperationType() == OperationTypes.READ - && (n.getDataType() == DataType.MATRIX || n.getDataType() == DataType.FRAME) ) { - - if ( !((Data)n).isLiteral() ) { - try { - String inst_string = n.getInstructions(); - CPInstruction currInstr = CPInstructionParser.parseSingleInstruction(inst_string); - currInstr.setLocation(n); - inst.add(currInstr); - } catch (DMLRuntimeException e) { - throw new LopsException(n.printErrorLocation() + "error generating instructions from input variables in Dag -- \n", e); - } - } - } - } - } - - - /** - * Determine whether to send <code>node</code> to MR or to process it in the control program. - * It is sent to MR in the following cases: - * - * 1) if input lop gets processed in MR then <code>node</code> can be piggybacked - * - * 2) if the exectype of write lop itself is marked MR i.e., memory estimate > memory budget. - * - * @param node low-level operator - * @return true if lop should be sent to MR - */ - private static boolean sendWriteLopToMR(Lop node) - { - if ( DMLScript.rtplatform == RUNTIME_PLATFORM.SINGLE_NODE ) - return false; - Lop in = node.getInputs().get(0); - Format nodeFormat = node.getOutputParameters().getFormat(); - - //send write lop to MR if (1) it is marked with exec type MR (based on its memory estimate), or - //(2) if the input lop is in MR and the write format allows to pack it into the same job (this does - //not apply to csv write because MR csvwrite is a separate MR job type) - return (node.getExecType() == ExecType.MR - || (in.getExecType() == ExecType.MR && nodeFormat != Format.CSV)); - } - - /** - * Computes the memory footprint required to execute <code>node</code> in the mapper. - * It is used only for those nodes that use inputs from distributed cache. The returned - * value is utilized in limiting the number of instructions piggybacked onto a single GMR mapper. - * - * @param node low-level operator - * @return memory footprint - */ - private static double computeFootprintInMapper(Lop node) { - // Memory limits must be checked only for nodes that use distributed cache - if ( ! node.usesDistributedCache() ) - // default behavior - return 0.0; - - OutputParameters in1dims = node.getInputs().get(0).getOutputParameters(); - OutputParameters in2dims = node.getInputs().get(1).getOutputParameters(); - - double footprint = 0; - if ( node instanceof MapMult ) { - int dcInputIndex = node.distributedCacheInputIndex()[0]; - footprint = AggBinaryOp.getMapmmMemEstimate( - in1dims.getNumRows(), in1dims.getNumCols(), in1dims.getRowsInBlock(), in1dims.getColsInBlock(), in1dims.getNnz(), - in2dims.getNumRows(), in2dims.getNumCols(), in2dims.getRowsInBlock(), in2dims.getColsInBlock(), in2dims.getNnz(), - dcInputIndex, false); - } - else if ( node instanceof PMMJ ) { - int dcInputIndex = node.distributedCacheInputIndex()[0]; - footprint = AggBinaryOp.getMapmmMemEstimate( - in1dims.getNumRows(), 1, in1dims.getRowsInBlock(), in1dims.getColsInBlock(), in1dims.getNnz(), - in2dims.getNumRows(), in2dims.getNumCols(), in2dims.getRowsInBlock(), in2dims.getColsInBlock(), in2dims.getNnz(), - dcInputIndex, true); - } - else if ( node instanceof AppendM ) { - footprint = BinaryOp.footprintInMapper( - in1dims.getNumRows(), in1dims.getNumCols(), - in2dims.getNumRows(), in2dims.getNumCols(), - in1dims.getRowsInBlock(), in1dims.getColsInBlock()); - } - else if ( node instanceof BinaryM ) { - footprint = BinaryOp.footprintInMapper( - in1dims.getNumRows(), in1dims.getNumCols(), - in2dims.getNumRows(), in2dims.getNumCols(), - in1dims.getRowsInBlock(), in1dims.getColsInBlock()); - } - else { - // default behavior - return 0.0; - } - return footprint; - } - - /** - * Determines if <code>node</code> can be executed in current round of MR jobs or if it needs to be queued for later rounds. - * If the total estimated footprint (<code>node</code> and previously added nodes in GMR) is less than available memory on - * the mappers then <code>node</code> can be executed in current round, and <code>true</code> is returned. Otherwise, - * <code>node</code> must be queued and <code>false</code> is returned. - * - * @param node low-level operator - * @param footprintInMapper mapper footprint - * @return true if node can be executed in current round of jobs - */ - private static boolean checkMemoryLimits(Lop node, double footprintInMapper) { - boolean addNode = true; - - // Memory limits must be checked only for nodes that use distributed cache - if ( ! node.usesDistributedCache() ) - // default behavior - return addNode; - - double memBudget = Math.min(AggBinaryOp.MAPMULT_MEM_MULTIPLIER, BinaryOp.APPEND_MEM_MULTIPLIER) * OptimizerUtils.getRemoteMemBudgetMap(true); - if ( footprintInMapper <= memBudget ) - return addNode; - else - return !addNode; - } - - /** - * Method to group a vector of sorted lops. - * - * @param sb statement block - * @param node_v list of low-level operators - * @return list of instructions - */ - private ArrayList<Instruction> doGreedyGrouping(StatementBlock sb, ArrayList<Lop> node_v) - { - if( LOG.isTraceEnabled() ) - LOG.trace("Grouping DAG ============"); - - // nodes to be executed in current iteration - ArrayList<Lop> execNodes = new ArrayList<>(); - // nodes that have already been processed - ArrayList<Lop> finishedNodes = new ArrayList<>(); - // nodes that are queued for the following iteration - ArrayList<Lop> queuedNodes = new ArrayList<>(); - - ArrayList<ArrayList<Lop>> jobNodes = createNodeVectors(JobType.getNumJobTypes()); - - // list of instructions - ArrayList<Instruction> inst = new ArrayList<>(); - - //ArrayList<Instruction> preWriteDeleteInst = new ArrayList<Instruction>(); - ArrayList<Instruction> writeInst = new ArrayList<>(); - ArrayList<Instruction> deleteInst = new ArrayList<>(); - ArrayList<Instruction> endOfBlockInst = new ArrayList<>(); - - // remove files for transient reads that are updated. - deleteUpdatedTransientReadVariables(sb, node_v, writeInst); - - generateRemoveInstructions(sb, endOfBlockInst); - - generateInstructionsForInputVariables(node_v, inst); - - - boolean done = false; - String indent = " "; - - while (!done) { - if( LOG.isTraceEnabled() ) - LOG.trace("Grouping nodes in DAG"); - - execNodes.clear(); - queuedNodes.clear(); - clearNodeVectors(jobNodes); - gmrMapperFootprint=0; - - for ( Lop node : node_v ) { - // finished nodes don't need to be processed - if (finishedNodes.contains(node)) - continue; - - if( LOG.isTraceEnabled() ) - LOG.trace("Processing node (" + node.getID() - + ") " + node.toString() + " exec nodes size is " + execNodes.size()); - - - //if node defines MR job, make sure it is compatible with all - //its children nodes in execNodes - if(node.definesMRJob() && !compatibleWithChildrenInExecNodes(execNodes, node)) - { - if( LOG.isTraceEnabled() ) - LOG.trace(indent + "Queueing node " - + node.toString() + " (code 1)"); - - queuedNodes.add(node); - removeNodesForNextIteration(node, finishedNodes, execNodes, queuedNodes, jobNodes); - continue; - } - - // if child is queued, this node will be processed in the later - // iteration - if (hasChildNode(node,queuedNodes)) { - - if( LOG.isTraceEnabled() ) - LOG.trace(indent + "Queueing node " - + node.toString() + " (code 2)"); - queuedNodes.add(node); - - // if node has more than two inputs, - // remove children that will be needed in a future - // iterations - // may also have to remove parent nodes of these children - removeNodesForNextIteration(node, finishedNodes, execNodes, - queuedNodes, jobNodes); - - continue; - } - - // if inputs come from different jobs, then queue - if ( node.getInputs().size() >= 2) { - int jobid = Integer.MIN_VALUE; - boolean queueit = false; - for(int idx=0; idx < node.getInputs().size(); idx++) { - int input_jobid = jobType(node.getInputs().get(idx), jobNodes); - if (input_jobid != -1) { - if ( jobid == Integer.MIN_VALUE ) - jobid = input_jobid; - else if ( jobid != input_jobid ) { - queueit = true; - break; - } - } - } - if ( queueit ) { - if( LOG.isTraceEnabled() ) - LOG.trace(indent + "Queueing node " + node.toString() + " (code 3)"); - queuedNodes.add(node); - removeNodesForNextIteration(node, finishedNodes, execNodes, queuedNodes, jobNodes); - continue; - } - } - - // See if this lop can be eliminated - // This check is for "aligner" lops (e.g., group) - boolean eliminate = false; - eliminate = canEliminateLop(node, execNodes); - if (eliminate) { - if( LOG.isTraceEnabled() ) - LOG.trace(indent + "Adding -"+ node.toString()); - execNodes.add(node); - finishedNodes.add(node); - addNodeByJobType(node, jobNodes, execNodes, eliminate); - continue; - } - - // If the node defines a MR Job then make sure none of its - // children that defines a MR Job are present in execNodes - if (node.definesMRJob()) { - if (hasMRJobChildNode(node, execNodes)) { - // "node" must NOT be queued when node=group and the child that defines job is Rand - // this is because "group" can be pushed into the "Rand" job. - if (! (node.getType() == Lop.Type.Grouping && checkDataGenAsChildNode(node,execNodes)) ) { - if( LOG.isTraceEnabled() ) - LOG.trace(indent + "Queueing node " + node.toString() + " (code 4)"); - - queuedNodes.add(node); - - removeNodesForNextIteration(node, finishedNodes, - execNodes, queuedNodes, jobNodes); - - continue; - } - } - } - - // if "node" has more than one input, and has a descendant lop - // in execNodes that is of type RecordReader - // then all its inputs must be ancestors of RecordReader. If - // not, queue "node" - if (node.getInputs().size() > 1 - && hasChildNode(node, execNodes, ExecLocation.RecordReader)) { - // get the actual RecordReader lop - Lop rr_node = getChildNode(node, execNodes, ExecLocation.RecordReader); - - // all inputs of "node" must be ancestors of rr_node - boolean queue_it = false; - for (Lop n : node.getInputs()) { - // each input should be ancestor of RecordReader lop - if (!n.equals(rr_node) && !isChild(rr_node, n, IDMap)) { - queue_it = true; // i.e., "node" must be queued - break; - } - } - - if (queue_it) { - // queue node - if( LOG.isTraceEnabled() ) - LOG.trace(indent + "Queueing -" + node.toString() + " (code 5)"); - queuedNodes.add(node); - // TODO: does this have to be modified to handle - // recordreader lops? - removeNodesForNextIteration(node, finishedNodes, - execNodes, queuedNodes, jobNodes); - continue; - } else { - // nothing here.. subsequent checks have to be performed - // on "node" - } - } - - // data node, always add if child not queued - // only write nodes are kept in execnodes - if (node.getExecLocation() == ExecLocation.Data) { - Data dnode = (Data) node; - boolean dnode_queued = false; - - if ( dnode.getOperationType() == OperationTypes.READ ) { - if( LOG.isTraceEnabled() ) - LOG.trace(indent + "Adding Data -"+ node.toString()); + // data node, always add if child not queued + // only write nodes are kept in execnodes + if (node.getExecLocation() == ExecLocation.Data) { + Data dnode = (Data) node; + boolean dnode_queued = false; + + if ( dnode.getOperationType() == OperationTypes.READ ) { + if( LOG.isTraceEnabled() ) + LOG.trace(indent + "Adding Data -"+ node.toString()); // TODO: avoid readScalar instruction, and read it on-demand just like the way Matrices are read in control program if ( node.getDataType() == DataType.SCALAR @@ -957,10 +542,7 @@ public class Dag<N extends Lop> // TODO: this case should ideally be handled in the language layer // prior to the construction of Hops Dag Lop input = dnode.getInputs().get(0); - if ( dnode.isTransient() - && input.getExecLocation() == ExecLocation.Data - && ((Data)input).isTransient() - && dnode.getOutputParameters().getLabel().equals(input.getOutputParameters().getLabel()) ) { + if ( isTransientWriteRead(dnode) ) { // do nothing, <code>node</code> must not processed any further. } else if ( execNodes.contains(input) && !isCompatible(node, input) && sendWriteLopToMR(node)) { @@ -1078,122 +660,601 @@ public class Dag<N extends Lop> continue; } - // aligned reduce, make sure a parent that is reduce exists - if (node.getExecLocation() == ExecLocation.Reduce) { - if ( compatibleWithChildrenInExecNodes(execNodes, node) && - (hasChildNode(node, execNodes, ExecLocation.MapAndReduce) - || hasChildNode(node, execNodes, ExecLocation.Map) ) ) - { - if( LOG.isTraceEnabled() ) - LOG.trace(indent + "Adding -"+ node.toString()); - execNodes.add(node); - finishedNodes.add(node); - addNodeByJobType(node, jobNodes, execNodes, false); - } else { - if( LOG.isTraceEnabled() ) - LOG.trace(indent + "Queueing -"+ node.toString() + " (code 8)"); - queuedNodes.add(node); - removeNodesForNextIteration(node, finishedNodes, - execNodes, queuedNodes, jobNodes); - } + // aligned reduce, make sure a parent that is reduce exists + if (node.getExecLocation() == ExecLocation.Reduce) { + if ( compatibleWithChildrenInExecNodes(execNodes, node) && + (hasChildNode(node, execNodes, ExecLocation.MapAndReduce) + || hasChildNode(node, execNodes, ExecLocation.Map) ) ) + { + if( LOG.isTraceEnabled() ) + LOG.trace(indent + "Adding -"+ node.toString()); + execNodes.add(node); + finishedNodes.add(node); + addNodeByJobType(node, jobNodes, execNodes, false); + } else { + if( LOG.isTraceEnabled() ) + LOG.trace(indent + "Queueing -"+ node.toString() + " (code 8)"); + queuedNodes.add(node); + removeNodesForNextIteration(node, finishedNodes, + execNodes, queuedNodes, jobNodes); + } + + continue; + + } + + // add Scalar to execNodes if it has no child in exec nodes + // that will be executed in a MR job. + if (node.getExecLocation() == ExecLocation.ControlProgram) { + for ( Lop lop : node.getInputs() ) { + if (execNodes.contains(lop) + && !(lop.getExecLocation() == ExecLocation.Data) + && !(lop.getExecLocation() == ExecLocation.ControlProgram)) { + if( LOG.isTraceEnabled() ) + LOG.trace(indent + "Queueing -"+ node.toString() + " (code 9)"); + + queuedNodes.add(node); + removeNodesForNextIteration(node, finishedNodes, + execNodes, queuedNodes, jobNodes); + break; + } + } + + if (queuedNodes.contains(node)) + continue; + if( LOG.isTraceEnabled() ) + LOG.trace(indent + "Adding - scalar"+ node.toString()); + execNodes.add(node); + addNodeByJobType(node, jobNodes, execNodes, false); + finishedNodes.add(node); + continue; + } + + } + + // no work to do + if ( execNodes.isEmpty() ) { + + if( !queuedNodes.isEmpty() ) + throw new LopsException("Queued nodes should not be 0 at this point \n"); + + if( LOG.isTraceEnabled() ) + LOG.trace("All done! queuedNodes = "+ queuedNodes.size()); + + done = true; + } else { + // work to do + + if( LOG.isTraceEnabled() ) + LOG.trace("Generating jobs for group -- Node count="+ execNodes.size()); + + // first process scalar instructions + generateControlProgramJobs(execNodes, inst, writeInst, deleteInst); + + // copy unassigned lops in execnodes to gmrnodes + for (int i = 0; i < execNodes.size(); i++) { + Lop node = execNodes.get(i); + if (jobType(node, jobNodes) == -1) { + if ( isCompatible(node, JobType.GMR) ) { + if ( node.hasNonBlockedInputs() ) { + jobNodes.get(JobType.GMRCELL.getId()).add(node); + addChildren(node, jobNodes.get(JobType.GMRCELL.getId()), execNodes); + } + else { + jobNodes.get(JobType.GMR.getId()).add(node); + addChildren(node, jobNodes.get(JobType.GMR.getId()), execNodes); + } + } + else { + if( LOG.isTraceEnabled() ) + LOG.trace(indent + "Queueing -" + node.toString() + " (code 10)"); + execNodes.remove(i); + finishedNodes.remove(node); + queuedNodes.add(node); + removeNodesForNextIteration(node, finishedNodes, + execNodes, queuedNodes, jobNodes); + } + } + } + + // next generate MR instructions + if (!execNodes.isEmpty()) + generateMRJobs(execNodes, inst, writeInst, deleteInst, jobNodes); + handleSingleOutputJobs(execNodes, jobNodes, finishedNodes); + } + } + + // add write and delete inst at the very end. + inst.addAll(writeInst); + inst.addAll(deleteInst); + inst.addAll(endOfBlockInst); + return inst; + } + + private ArrayList<Instruction> doPlainInstructionGen(StatementBlock sb, List<Lop> nodes) + { + //prepare basic instruction sets + List<Instruction> deleteInst = new ArrayList<>(); + List<Instruction> writeInst = deleteUpdatedTransientReadVariables(sb, nodes); + List<Instruction> endOfBlockInst = generateRemoveInstructions(sb); + ArrayList<Instruction> inst = generateInstructionsForInputVariables(nodes); + + // filter out non-executable nodes + List<Lop> execNodes = nodes.stream() + .filter(l -> (l.getExecLocation() != ExecLocation.Data + || (((Data)l).getOperationType()==OperationTypes.WRITE && !isTransientWriteRead((Data)l)) + || (((Data)l).isPersistentRead() && l.getDataType().isScalar()))) + .collect(Collectors.toList()); + + // generate executable instruction + generateControlProgramJobs(execNodes, inst, writeInst, deleteInst); + + // add write and delete inst at the very end. + inst.addAll(writeInst); + inst.addAll(deleteInst); + inst.addAll(endOfBlockInst); + return inst; + } + + private boolean isTransientWriteRead(Data dnode) { + Lop input = dnode.getInputs().get(0); + return dnode.isTransient() + && input.getExecLocation() == ExecLocation.Data && ((Data)input).isTransient() + && dnode.getOutputParameters().getLabel().equals(input.getOutputParameters().getLabel()); + } + + private static List<Instruction> deleteUpdatedTransientReadVariables(StatementBlock sb, List<Lop> nodeV) { + List<Instruction> insts = new ArrayList<>(); + if ( sb == null ) //return modifiable list + return insts; + + if( LOG.isTraceEnabled() ) + LOG.trace("In delete updated variables"); + + // CANDIDATE list of variables which could have been updated in this statement block + HashMap<String, Lop> labelNodeMapping = new HashMap<>(); + + // ACTUAL list of variables whose value is updated, AND the old value of the variable + // is no longer accessible/used. + HashSet<String> updatedLabels = new HashSet<>(); + HashMap<String, Lop> updatedLabelsLineNum = new HashMap<>(); + + // first capture all transient read variables + for ( Lop node : nodeV ) { + if (node.getExecLocation() == ExecLocation.Data + && ((Data) node).isTransient() + && ((Data) node).getOperationType() == OperationTypes.READ + && ((Data) node).getDataType() == DataType.MATRIX) { + // "node" is considered as updated ONLY IF the old value is not used any more + // So, make sure that this READ node does not feed into any (transient/persistent) WRITE + boolean hasWriteParent=false; + for(Lop p : node.getOutputs()) { + if(p.getExecLocation() == ExecLocation.Data) { + // if the "p" is of type Data, then it has to be a WRITE + hasWriteParent = true; + break; + } + } + if ( !hasWriteParent ) { + // node has no parent of type WRITE, so this is a CANDIDATE variable + // add it to labelNodeMapping so that it is considered in further processing + labelNodeMapping.put(node.getOutputParameters().getLabel(), node); + } + } + } + + // capture updated transient write variables + for ( Lop node : nodeV ) { + if (node.getExecLocation() == ExecLocation.Data + && ((Data) node).isTransient() + && ((Data) node).getOperationType() == OperationTypes.WRITE + && ((Data) node).getDataType() == DataType.MATRIX + && labelNodeMapping.containsKey(node.getOutputParameters().getLabel()) // check to make sure corresponding (i.e., with the same label/name) transient read is present + && !labelNodeMapping.containsValue(node.getInputs().get(0)) ){ // check to avoid cases where transient read feeds into a transient write + updatedLabels.add(node.getOutputParameters().getLabel()); + updatedLabelsLineNum.put(node.getOutputParameters().getLabel(), node); + } + } + + // generate RM instructions + Instruction rm_inst = null; + for ( String label : updatedLabels ) { + rm_inst = VariableCPInstruction.prepareRemoveInstruction(label); + rm_inst.setLocation(updatedLabelsLineNum.get(label)); + if( LOG.isTraceEnabled() ) + LOG.trace(rm_inst.toString()); + insts.add(rm_inst); + } + return insts; + } + + private static List<Instruction> generateRemoveInstructions(StatementBlock sb) { + if ( sb == null ) + return Collections.emptyList(); + ArrayList<Instruction> insts = new ArrayList<>(); + + if( LOG.isTraceEnabled() ) + LOG.trace("In generateRemoveInstructions()"); + + // RULE 1: if in IN and not in OUT, then there should be an rmvar or rmfilevar inst + // (currently required for specific cases of external functions) + for (String varName : sb.liveIn().getVariableNames()) { + if (!sb.liveOut().containsVariable(varName)) { + Instruction inst = VariableCPInstruction.prepareRemoveInstruction(varName); + inst.setLocation(sb.getFilename(), sb.getEndLine(), sb.getEndLine(), -1, -1); + insts.add(inst); + if( LOG.isTraceEnabled() ) + LOG.trace(" Adding " + inst.toString()); + } + } + return insts; + } + + private static List<List<Lop>> createNodeVectors(int size) { + return IntStream.range(0, size).mapToObj(i -> + new ArrayList<Lop>()).collect(Collectors.toList()); + } - continue; + private static void clearNodeVectors(List<List<Lop>> arr) { + arr.stream().forEach(t -> t.clear()); + } - } + private static boolean isCompatible(List<Lop> nodes, JobType jt, int from, int to) { + return nodes.stream().allMatch(n -> (n.getCompatibleJobs() & jt.getBase()) != 0); + } - // add Scalar to execNodes if it has no child in exec nodes - // that will be executed in a MR job. - if (node.getExecLocation() == ExecLocation.ControlProgram) { - for ( Lop lop : node.getInputs() ) { - if (execNodes.contains(lop) - && !(lop.getExecLocation() == ExecLocation.Data) - && !(lop.getExecLocation() == ExecLocation.ControlProgram)) { - if( LOG.isTraceEnabled() ) - LOG.trace(indent + "Queueing -"+ node.toString() + " (code 9)"); + /** + * Function that determines if the two input nodes can be executed together + * in at least one job. + * + * @param node1 low-level operator 1 + * @param node2 low-level operator 2 + * @return true if nodes can be executed together + */ + private static boolean isCompatible(Lop node1, Lop node2) { + return( (node1.getCompatibleJobs() & node2.getCompatibleJobs()) > 0); + } + + /** + * Function that checks if the given node executes in the job specified by jt. + * + * @param node low-level operator + * @param jt job type + * @return true if node executes in the specified job type + */ + private static boolean isCompatible(Lop node, JobType jt) { + if ( jt == JobType.GMRCELL ) + jt = JobType.GMR; + return ((node.getCompatibleJobs() & jt.getBase()) > 0); + } - queuedNodes.add(node); - removeNodesForNextIteration(node, finishedNodes, - execNodes, queuedNodes, jobNodes); - break; + /* + * Add node, and its relevant children to job-specific node vectors. + */ + private void addNodeByJobType(Lop node, List<List<Lop>> arr, List<Lop> execNodes, boolean eliminate) { + if (!eliminate) { + // Check if this lop defines a MR job. + if ( node.definesMRJob() ) { + + // find the corresponding JobType + JobType jt = JobType.findJobTypeFromLop(node); + + if ( jt == null ) { + throw new LopsException(node.printErrorLocation() + "No matching JobType is found for a the lop type: " + node.getType() + " \n"); + } + + // Add "node" to corresponding job vector + + if ( jt == JobType.GMR ) { + if ( node.hasNonBlockedInputs() ) { + int gmrcell_index = JobType.GMRCELL.getId(); + arr.get(gmrcell_index).add(node); + int from = arr.get(gmrcell_index).size(); + addChildren(node, arr.get(gmrcell_index), execNodes); + int to = arr.get(gmrcell_index).size(); + if (!isCompatible(arr.get(gmrcell_index),JobType.GMR, from, to)) // check against GMR only, not against GMRCELL + throw new LopsException(node.printErrorLocation() + "Error during compatibility check \n"); + } + else { + // if "node" (in this case, a group lop) has any inputs from RAND + // then add it to RAND job. Otherwise, create a GMR job + if (hasChildNode(node, arr.get(JobType.DATAGEN.getId()) )) { + arr.get(JobType.DATAGEN.getId()).add(node); + // we should NOT call 'addChildren' because appropriate + // child nodes would have got added to RAND job already + } else { + int gmr_index = JobType.GMR.getId(); + arr.get(gmr_index).add(node); + int from = arr.get(gmr_index).size(); + addChildren(node, arr.get(gmr_index), execNodes); + int to = arr.get(gmr_index).size(); + if (!isCompatible(arr.get(gmr_index),JobType.GMR, from, to)) + throw new LopsException(node.printErrorLocation() + "Error during compatibility check \n"); } } - - if (queuedNodes.contains(node)) - continue; - if( LOG.isTraceEnabled() ) - LOG.trace(indent + "Adding - scalar"+ node.toString()); - execNodes.add(node); - addNodeByJobType(node, jobNodes, execNodes, false); - finishedNodes.add(node); - continue; } - + else { + int index = jt.getId(); + arr.get(index).add(node); + int from = arr.get(index).size(); + addChildren(node, arr.get(index), execNodes); + int to = arr.get(index).size(); + // check if all added nodes are compatible with current job + if (!isCompatible(arr.get(index), jt, from, to)) { + throw new LopsException( + "Unexpected error in addNodeByType."); + } + } + return; } + } - // no work to do - if ( execNodes.isEmpty() ) { - - if( !queuedNodes.isEmpty() ) - throw new LopsException("Queued nodes should not be 0 at this point \n"); - - if( LOG.isTraceEnabled() ) - LOG.trace("All done! queuedNodes = "+ queuedNodes.size()); - - done = true; - } else { - // work to do + if ( eliminate ) { + // Eliminated lops are directly added to GMR queue. + // Note that eliminate flag is set only for 'group' lops + if ( node.hasNonBlockedInputs() ) + arr.get(JobType.GMRCELL.getId()).add(node); + else + arr.get(JobType.GMR.getId()).add(node); + return; + } + + /* + * If this lop does not define a job, check if it uses the output of any + * specialized job. i.e., if this lop has a child node in any of the + * job-specific vector, then add it to the vector. Note: This lop must + * be added to ONLY ONE of the job-specific vectors. + */ - if( LOG.isTraceEnabled() ) - LOG.trace("Generating jobs for group -- Node count="+ execNodes.size()); + int numAdded = 0; + for ( JobType j : JobType.values() ) { + if ( j.getId() > 0 && hasDirectChildNode(node, arr.get(j.getId()))) { + if (isCompatible(node, j)) { + arr.get(j.getId()).add(node); + numAdded += 1; + } + } + } + if (numAdded > 1) { + throw new LopsException("Unexpected error in addNodeByJobType(): A given lop can ONLY be added to a single job vector (numAdded = " + numAdded + ")." ); + } + } - // first process scalar instructions - generateControlProgramJobs(execNodes, inst, writeInst, deleteInst); + /* + * Remove the node from all job-specific node vectors. This method is + * invoked from removeNodesForNextIteration(). + */ + private static void removeNodeByJobType(Lop node, List<List<Lop>> arr) { + for ( JobType jt : JobType.values()) + if ( jt.getId() > 0 ) + arr.get(jt.getId()).remove(node); + } - // copy unassigned lops in execnodes to gmrnodes - for (int i = 0; i < execNodes.size(); i++) { - Lop node = execNodes.get(i); - if (jobType(node, jobNodes) == -1) { - if ( isCompatible(node, JobType.GMR) ) { - if ( node.hasNonBlockedInputs() ) { - jobNodes.get(JobType.GMRCELL.getId()).add(node); - addChildren(node, jobNodes.get(JobType.GMRCELL.getId()), execNodes); - } - else { - jobNodes.get(JobType.GMR.getId()).add(node); - addChildren(node, jobNodes.get(JobType.GMR.getId()), execNodes); + /** + * As some jobs only write one output, all operations in the mapper need to + * be redone and cannot be marked as finished. + * + * @param execNodes list of exec low-level operators + * @param jobNodes list of job low-level operators + * @param finishedNodes list of finished low-level operators + */ + private void handleSingleOutputJobs(List<Lop> execNodes, List<List<Lop>> jobNodes, List<Lop> finishedNodes) + { + /* + * If the input of a MMCJ/MMRJ job (must have executed in a Mapper) is used + * by multiple lops then we should mark it as not-finished. + */ + ArrayList<Lop> nodesWithUnfinishedOutputs = new ArrayList<>(); + int[] jobIndices = {JobType.MMCJ.getId()}; + Lop.Type[] lopTypes = { Lop.Type.MMCJ}; + + // TODO: SortByValue should be treated similar to MMCJ, since it can + // only sort one file now + + for ( int jobi=0; jobi < jobIndices.length; jobi++ ) { + int jindex = jobIndices[jobi]; + if (!jobNodes.get(jindex).isEmpty()) { + List<Lop> vec = jobNodes.get(jindex); + // first find all nodes with more than one parent that is not finished. + for (int i = 0; i < vec.size(); i++) { + Lop node = vec.get(i); + if (node.getExecLocation() == ExecLocation.MapOrReduce + || node.getExecLocation() == ExecLocation.Map) { + Lop MRparent = getParentNode(node, execNodes, ExecLocation.MapAndReduce); + if ( MRparent != null && MRparent.getType() == lopTypes[jobi]) { + int numParents = node.getOutputs().size(); + if (numParents > 1) { + for (int j = 0; j < numParents; j++) { + if (!finishedNodes.contains(node.getOutputs() + .get(j))) + nodesWithUnfinishedOutputs.add(node); + } } } - else { - if( LOG.isTraceEnabled() ) - LOG.trace(indent + "Queueing -" + node.toString() + " (code 10)"); - execNodes.remove(i); + } + } + + // need to redo all nodes in nodesWithOutput as well as their children + for ( Lop node : vec ) { + if (node.getExecLocation() == ExecLocation.MapOrReduce + || node.getExecLocation() == ExecLocation.Map) { + if (nodesWithUnfinishedOutputs.contains(node)) finishedNodes.remove(node); - queuedNodes.add(node); - removeNodesForNextIteration(node, finishedNodes, - execNodes, queuedNodes, jobNodes); - } + if (hasParentNode(node, nodesWithUnfinishedOutputs)) + finishedNodes.remove(node); + } + } + } + } + + } + + /** + * Method to check if a lop can be eliminated from checking + * + * @param node low-level operator + * @param execNodes list of exec nodes + * @return true if lop can be eliminated + */ + private static boolean canEliminateLop(Lop node, List<Lop> execNodes) { + // this function can only eliminate "aligner" lops such a group + if (!node.isAligner()) + return false; + // find the child whose execLoc = 'MapAndReduce' + int ret = getChildAlignment(node, execNodes, ExecLocation.MapAndReduce); + if (ret == CHILD_BREAKS_ALIGNMENT) + return false; + else if (ret == CHILD_DOES_NOT_BREAK_ALIGNMENT) + return true; + else if (ret == MRCHILD_NOT_FOUND) + return false; + else if (ret == MR_CHILD_FOUND_BREAKS_ALIGNMENT) + return false; + else if (ret == MR_CHILD_FOUND_DOES_NOT_BREAK_ALIGNMENT) + return true; + else + throw new RuntimeException("Should not happen. \n"); + } + + + /** + * Method to generate createvar instructions, which creates a new entry + * in the symbol table. One instruction is generated for every LOP that is + * 1) type Data and + * 2) persistent and + * 3) matrix and + * 4) read + * + * Transient reads needn't be considered here since the previous program + * block would already create appropriate entries in the symbol table. + * + * @param nodes_v list of nodes + * @return list of instructions + */ + private static ArrayList<Instruction> generateInstructionsForInputVariables(List<Lop> nodes_v) { + ArrayList<Instruction> insts = new ArrayList<>(); + for(Lop n : nodes_v) { + if (n.getExecLocation() == ExecLocation.Data && !((Data) n).isTransient() + && ((Data) n).getOperationType() == OperationTypes.READ + && (n.getDataType() == DataType.MATRIX || n.getDataType() == DataType.FRAME) ) { + if ( !((Data)n).isLiteral() ) { + try { + String inst_string = n.getInstructions(); + CPInstruction currInstr = CPInstructionParser.parseSingleInstruction(inst_string); + currInstr.setLocation(n); + insts.add(currInstr); + } catch (DMLRuntimeException e) { + throw new LopsException(n.printErrorLocation() + "error generating instructions from input variables in Dag -- \n", e); } } - - // next generate MR instructions - if (!execNodes.isEmpty()) - generateMRJobs(execNodes, inst, writeInst, deleteInst, jobNodes); - handleSingleOutputJobs(execNodes, jobNodes, finishedNodes); } } - - // add write and delete inst at the very end. - - //inst.addAll(preWriteDeleteInst); - inst.addAll(writeInst); - inst.addAll(deleteInst); - inst.addAll(endOfBlockInst); - - return inst; - + return insts; + } + + + /** + * Determine whether to send <code>node</code> to MR or to process it in the control program. + * It is sent to MR in the following cases: + * + * 1) if input lop gets processed in MR then <code>node</code> can be piggybacked + * + * 2) if the exectype of write lop itself is marked MR i.e., memory estimate > memory budget. + * + * @param node low-level operator + * @return true if lop should be sent to MR + */ + private static boolean sendWriteLopToMR(Lop node) + { + if ( DMLScript.rtplatform == RUNTIME_PLATFORM.SINGLE_NODE ) + return false; + Lop in = node.getInputs().get(0); + Format nodeFormat = node.getOutputParameters().getFormat(); + + //send write lop to MR if (1) it is marked with exec type MR (based on its memory estimate), or + //(2) if the input lop is in MR and the write format allows to pack it into the same job (this does + //not apply to csv write because MR csvwrite is a separate MR job type) + return (node.getExecType() == ExecType.MR + || (in.getExecType() == ExecType.MR && nodeFormat != Format.CSV)); + } + + /** + * Computes the memory footprint required to execute <code>node</code> in the mapper. + * It is used only for those nodes that use inputs from distributed cache. The returned + * value is utilized in limiting the number of instructions piggybacked onto a single GMR mapper. + * + * @param node low-level operator + * @return memory footprint + */ + private static double computeFootprintInMapper(Lop node) { + // Memory limits must be checked only for nodes that use distributed cache + if ( ! node.usesDistributedCache() ) + // default behavior + return 0.0; + + OutputParameters in1dims = node.getInputs().get(0).getOutputParameters(); + OutputParameters in2dims = node.getInputs().get(1).getOutputParameters(); + + double footprint = 0; + if ( node instanceof MapMult ) { + int dcInputIndex = node.distributedCacheInputIndex()[0]; + footprint = AggBinaryOp.getMapmmMemEstimate( + in1dims.getNumRows(), in1dims.getNumCols(), in1dims.getRowsInBlock(), in1dims.getColsInBlock(), in1dims.getNnz(), + in2dims.getNumRows(), in2dims.getNumCols(), in2dims.getRowsInBlock(), in2dims.getColsInBlock(), in2dims.getNnz(), + dcInputIndex, false); + } + else if ( node instanceof PMMJ ) { + int dcInputIndex = node.distributedCacheInputIndex()[0]; + footprint = AggBinaryOp.getMapmmMemEstimate( + in1dims.getNumRows(), 1, in1dims.getRowsInBlock(), in1dims.getColsInBlock(), in1dims.getNnz(), + in2dims.getNumRows(), in2dims.getNumCols(), in2dims.getRowsInBlock(), in2dims.getColsInBlock(), in2dims.getNnz(), + dcInputIndex, true); + } + else if ( node instanceof AppendM ) { + footprint = BinaryOp.footprintInMapper( + in1dims.getNumRows(), in1dims.getNumCols(), + in2dims.getNumRows(), in2dims.getNumCols(), + in1dims.getRowsInBlock(), in1dims.getColsInBlock()); + } + else if ( node instanceof BinaryM ) { + footprint = BinaryOp.footprintInMapper( + in1dims.getNumRows(), in1dims.getNumCols(), + in2dims.getNumRows(), in2dims.getNumCols(), + in1dims.getRowsInBlock(), in1dims.getColsInBlock()); + } + else { + // default behavior + return 0.0; + } + return footprint; + } + + /** + * Determines if <code>node</code> can be executed in current round of MR jobs or if it needs to be queued for later rounds. + * If the total estimated footprint (<code>node</code> and previously added nodes in GMR) is less than available memory on + * the mappers then <code>node</code> can be executed in current round, and <code>true</code> is returned. Otherwise, + * <code>node</code> must be queued and <code>false</code> is returned. + * + * @param node low-level operator + * @param footprintInMapper mapper footprint + * @return true if node can be executed in current round of jobs + */ + private static boolean checkMemoryLimits(Lop node, double footprintInMapper) { + boolean addNode = true; + + // Memory limits must be checked only for nodes that use distributed cache + if ( ! node.usesDistributedCache() ) + // default behavior + return addNode; + + double memBudget = Math.min(AggBinaryOp.MAPMULT_MEM_MULTIPLIER, BinaryOp.APPEND_MEM_MULTIPLIER) * OptimizerUtils.getRemoteMemBudgetMap(true); + if ( footprintInMapper <= memBudget ) + return addNode; + else + return !addNode; } - private boolean compatibleWithChildrenInExecNodes(ArrayList<Lop> execNodes, Lop node) { + private boolean compatibleWithChildrenInExecNodes(List<Lop> execNodes, Lop node) { for( Lop tmpNode : execNodes ) { // for lops that execute in control program, compatibleJobs property is set to LopProperties.INVALID // we should not consider such lops in this check @@ -1211,7 +1272,7 @@ public class Dag<N extends Lop> * @param varName variable name * @param deleteInst list of instructions */ - private static void excludeRemoveInstruction(String varName, ArrayList<Instruction> deleteInst) { + private static void excludeRemoveInstruction(String varName, List<Instruction> deleteInst) { for(int i=0; i < deleteInst.size(); i++) { Instruction inst = deleteInst.get(i); if ((inst.getType() == IType.CONTROL_PROGRAM || inst.getType() == IType.SPARK) @@ -1229,7 +1290,7 @@ public class Dag<N extends Lop> * @param inst list of instructions * @param delteInst list of instructions */ - private static void processConsumersForInputs(Lop node, ArrayList<Instruction> inst, ArrayList<Instruction> delteInst) { + private static void processConsumersForInputs(Lop node, List<Instruction> inst, List<Instruction> delteInst) { // reduce the consumer count for all input lops // if the count becomes zero, then then variable associated w/ input can be removed for(Lop in : node.getInputs() ) { @@ -1242,7 +1303,7 @@ public class Dag<N extends Lop> } } - private static void processConsumers(Lop node, ArrayList<Instruction> inst, ArrayList<Instruction> deleteInst, Lop locationInfo) { + private static void processConsumers(Lop node, List<Instruction> inst, List<Instruction> deleteInst, Lop locationInfo) { // reduce the consumer count for all input lops // if the count becomes zero, then then variable associated w/ input can be removed if ( node.removeConsumer() == 0 ) { @@ -1272,8 +1333,8 @@ public class Dag<N extends Lop> * @param writeInst list of write instructions * @param deleteInst list of delete instructions */ - private void generateControlProgramJobs(ArrayList<Lop> execNodes, - ArrayList<Instruction> inst, ArrayList<Instruction> writeInst, ArrayList<Instruction> deleteInst) { + private void generateControlProgramJobs(List<Lop> execNodes, + List<Instruction> inst, List<Instruction> writeInst, List<Instruction> deleteInst) { // nodes to be deleted from execnodes ArrayList<Lop> markedNodes = new ArrayList<>(); @@ -1549,9 +1610,8 @@ public class Dag<N extends Lop> * @param queuedNodes list of queued nodes * @param jobvec list of lists of low-level operators */ - private void removeNodesForNextIteration(Lop node, ArrayList<Lop> finishedNodes, - ArrayList<Lop> execNodes, ArrayList<Lop> queuedNodes, - ArrayList<ArrayList<Lop>> jobvec) { + private void removeNodesForNextIteration(Lop node, List<Lop> finishedNodes, + List<Lop> execNodes, List<Lop> queuedNodes, List<List<Lop>> jobvec) { // only queued nodes with multiple inputs need to be handled. if (node.getInputs().size() == 1) @@ -1572,7 +1632,7 @@ public class Dag<N extends Lop> LOG.trace(" Before remove nodes for next iteration -- size of execNodes " + execNodes.size()); // Determine if <code>node</code> has inputs from the same job or multiple jobs - int jobid = Integer.MIN_VALUE; + int jobid = Integer.MIN_VALUE; boolean inputs_in_same_job = true; for( Lop input : node.getInputs() ) { int input_jobid = jobType(input, jobvec); @@ -1664,7 +1724,6 @@ public class Dag<N extends Lop> if(queueit) { if( LOG.isTraceEnabled() ) LOG.trace(" Removing for next iteration (code " + code + "): (" + tmpNode.getID() + ") " + tmpNode.toString()); - markedNodes.add(tmpNode); } } @@ -1723,23 +1782,19 @@ public class Dag<N extends Lop> } } - private boolean branchCanBePiggyBackedReduce(Lop tmpNode, Lop node, ArrayList<Lop> execNodes, ArrayList<Lop> queuedNodes) { + private boolean branchCanBePiggyBackedReduce(Lop tmpNode, Lop node, List<Lop> execNodes, List<Lop> queuedNodes) { if(node.getExecLocation() != ExecLocation.Reduce) return false; - // if tmpNode is descendant of any queued child of node, then branch can not be piggybacked for(Lop ni : node.getInputs()) { if(queuedNodes.contains(ni) && isChild(tmpNode, ni, IDMap)) return false; } - for( Lop n : execNodes ) { if(n.equals(node)) continue; - if(n.equals(tmpNode) && n.getExecLocation() != ExecLocation.Map && n.getExecLocation() != ExecLocation.MapOrReduce) return false; - // check if n is on the branch tmpNode->*->node if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap)) { if(!node.getInputs().contains(tmpNode) // redundant @@ -1750,7 +1805,7 @@ public class Dag<N extends Lop> return true; } - private boolean branchCanBePiggyBackedMap(Lop tmpNode, Lop node, ArrayList<Lop> execNodes, ArrayList<Lop> queuedNodes, ArrayList<Lop> markedNodes) { + private boolean branchCanBePiggyBackedMap(Lop tmpNode, Lop node, List<Lop> execNodes, List<Lop> queuedNodes, List<Lop> markedNodes) { if(node.getExecLocation() != ExecLocation.Map) return false; @@ -1811,17 +1866,13 @@ public class Dag<N extends Lop> * @param queuedNodes list of queued nodes * @return true if tmpNode can be piggbacked on node */ - private boolean branchCanBePiggyBackedMapAndReduce(Lop tmpNode, Lop node, - ArrayList<Lop> execNodes, ArrayList<Lop> queuedNodes) { - + private boolean branchCanBePiggyBackedMapAndReduce(Lop tmpNode, Lop node, List<Lop> execNodes, List<Lop> queuedNodes) { if (node.getExecLocation() != ExecLocation.MapAndReduce) return false; JobType jt = JobType.findJobTypeFromLop(node); - for ( Lop n : execNodes ) { if (n.equals(node)) continue; - // Evaluate only nodes on the branch between tmpNode->..->node if (n.equals(tmpNode) || (isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap))) { if ( hasOtherMapAndReduceParentNode(tmpNode, queuedNodes,node) ) @@ -1836,8 +1887,8 @@ public class Dag<N extends Lop> return true; } - private boolean branchHasNoOtherUnExecutedParents(Lop tmpNode, Lop node, - ArrayList<Lop> execNodes, ArrayList<Lop> finishedNodes) { + private boolean branchHasNoOtherUnExecutedParents(Lop tmpNode, Lop node, + List<Lop> execNodes, List<Lop> finishedNodes) { //if tmpNode has more than one unfinished output, return false if(tmpNode.getOutputs().size() > 1) @@ -1860,7 +1911,7 @@ public class Dag<N extends Lop> { int cnt = 0; for (Lop output : n.getOutputs() ) { - if (!finishedNodes.contains(output)) + if (!finishedNodes.contains(output)) cnt++; } @@ -1879,7 +1930,7 @@ public class Dag<N extends Lop> * @param jobvec list of lists of low-level operators * @return job index for a low-level operator */ - private static int jobType(Lop lops, ArrayList<ArrayList<Lop>> jobvec) { + private static int jobType(Lop lops, List<List<Lop>> jobvec) { for ( JobType jt : JobType.values()) { int i = jt.getId(); if (i > 0 && jobvec.get(i) != null && jobvec.get(i).contains(lops)) { @@ -1898,12 +1949,9 @@ public class Dag<N extends Lop> * @param node low-level operator * @return true if MapAndReduce node between tmpNode and node in nodeList */ - private boolean hasOtherMapAndReduceParentNode(Lop tmpNode, - ArrayList<Lop> nodeList, Lop node) { - + private boolean hasOtherMapAndReduceParentNode(Lop tmpNode, List<Lop> nodeList, Lop node) { if ( tmpNode.getExecLocation() == ExecLocation.MapAndReduce) return true; - for ( Lop n : tmpNode.getOutputs() ) { if ( nodeList.contains(n) && isChild(n,node,IDMap)) { if(!n.equals(node) && n.getExecLocation() == ExecLocation.MapAndReduce) @@ -1912,7 +1960,6 @@ public class Dag<N extends Lop> return hasOtherMapAndReduceParentNode(n, nodeList, node); } } - return false; } @@ -1924,7 +1971,7 @@ public class Dag<N extends Lop> * @param node low-level operator * @return true if there is a queued node that is a parent of tmpNode and node */ - private boolean hasOtherQueuedParentNode(Lop tmpNode, ArrayList<Lop> queuedNodes, Lop node) { + private boolean hasOtherQueuedParentNode(Lop tmpNode, List<Lop> queuedNodes, Lop node) { if ( queuedNodes.isEmpty() ) return false; @@ -1947,22 +1994,20 @@ public class Dag<N extends Lop> * * @param jobNodes list of lists of low-level operators */ - private static void printJobNodes(ArrayList<ArrayList<Lop>> jobNodes) - { - if (LOG.isTraceEnabled()){ - for ( JobType jt : JobType.values() ) { - int i = jt.getId(); - if (i > 0 && jobNodes.get(i) != null && !jobNodes.get(i).isEmpty() ) { - LOG.trace(jt.getName() + " Job Nodes:"); - - for (int j = 0; j < jobNodes.get(i).size(); j++) { - LOG.trace(" " - + jobNodes.get(i).get(j).getID() + ") " - + jobNodes.get(i).get(j).toString()); - } + private static void printJobNodes(List<List<Lop>> jobNodes) { + if( !LOG.isTraceEnabled() ) + return; + for ( JobType jt : JobType.values() ) { + int i = jt.getId(); + if (i > 0 && jobNodes.get(i) != null && !jobNodes.get(i).isEmpty() ) { + LOG.trace(jt.getName() + " Job Nodes:"); + + for (int j = 0; j < jobNodes.get(i).size(); j++) { + LOG.trace(" " + + jobNodes.get(i).get(j).getID() + ") " + + jobNodes.get(i).get(j).toString()); } } - } } @@ -1973,7 +2018,7 @@ public class Dag<N extends Lop> * @param loc exec location * @return true if there is a node with RecordReader exec location */ - private static boolean hasANode(ArrayList<Lop> nodes, ExecLocation loc) { + private static boolean hasANode(List<Lop> nodes, ExecLocation loc) { for ( Lop n : nodes ) { if (n.getExecLocation() == ExecLocation.RecordReader) return true; @@ -1981,7 +2026,7 @@ public class Dag<N extends Lop> return false; } - private ArrayList<ArrayList<Lop>> splitGMRNodesByRecordReader(ArrayList<Lop> gmrnodes) + private List<List<Lop>> splitGMRNodesByRecordReader(List<Lop> gmrnodes) { // obtain the list of record reader nodes ArrayList<Lop> rrnodes = new ArrayList<>(); @@ -1992,7 +2037,7 @@ public class Dag<N extends Lop> // We allocate one extra vector to hold lops that do not depend on any // recordreader lops - ArrayList<ArrayList<Lop>> splitGMR = createNodeVectors(rrnodes.size() + 1); + List<List<Lop>> splitGMR = createNodeVectors(rrnodes.size() + 1); // flags to indicate whether a lop has been added to one of the node vectors boolean[] flags = new boolean[gmrnodes.size()]; @@ -2039,10 +2084,8 @@ public class Dag<N extends Lop> * @param deleteinst list of delete instructions * @param jobNodes list of list of low-level operators */ - private void generateMRJobs(ArrayList<Lop> execNodes, - ArrayList<Instruction> inst, - ArrayList<Instruction> writeinst, - ArrayList<Instruction> deleteinst, ArrayList<ArrayList<Lop>> jobNodes) + private void generateMRJobs(List<Lop> execNodes, List<Instruction> inst, + List<Instruction> writeinst, List<Instruction> deleteinst, List<List<Lop>> jobNodes) { printJobNodes(jobNodes); @@ -2054,7 +2097,7 @@ public class Dag<N extends Lop> continue; int index = jt.getId(); // job id is used as an index into jobNodes - ArrayList<Lop> currNodes = jobNodes.get(index); + List<Lop> currNodes = jobNodes.get(index); // generate MR job if (currNodes != null && !currNodes.isEmpty() ) { @@ -2064,7 +2107,7 @@ public class Dag<N extends Lop> if (jt.allowsRecordReaderInstructions() && hasANode(jobNodes.get(index), ExecLocation.RecordReader)) { // split the nodes by recordReader lops - ArrayList<ArrayList<Lop>> rrlist = splitGMRNodesByRecordReader(jobNodes.get(index)); + List<List<Lop>> rrlist = splitGMRNodesByRecordReader(jobNodes.get(index)); for (int i = 0; i < rrlist.size(); i++) { generateMapReduceInstructions(rrlist.get(i), inst, writeinst, deleteinst, rmvarinst, jt); } @@ -2120,7 +2163,7 @@ public class Dag<N extends Lop> * @param node_v list of nodes * @param exec_n list of nodes */ - private void addParents(Lop node, ArrayList<Lop> node_v, ArrayList<Lop> exec_n) { + private void addParents(Lop node, List<Lop> node_v, List<Lop> exec_n) { for (Lop enode : exec_n ) { if (isChild(node, enode, IDMap)) { if (!node_v.contains(enode)) { @@ -2139,7 +2182,7 @@ public class Dag<N extends Lop> * @param node_v list of nodes * @param exec_n list of nodes */ - private static void addChildren(Lop node, ArrayList<Lop> node_v, ArrayList<Lop> exec_n) { + private static void addChildren(Lop node, List<Lop> node_v, List<Lop> exec_n) { // add child in exec nodes that is not of type scalar if (exec_n.contains(node) @@ -2624,9 +2667,8 @@ public class Dag<N extends Lop> * @param rmvarinst list of rmvar instructions * @param jt job type */ - private void generateMapReduceInstructions(ArrayList<Lop> execNodes, - ArrayList<Instruction> inst, ArrayList<Instruction> writeinst, ArrayList<Instruction> deleteinst, ArrayList<Instruction> rmvarinst, - JobType jt) + private void generateMapReduceInstructions(List<Lop> execNodes, List<Instruction> inst, + List<Instruction> writeinst, List<Instruction> deleteinst, List<Instruction> rmvarinst, JobType jt) { ArrayList<Byte> resultIndices = new ArrayList<>(); ArrayList<String> inputs = new ArrayList<>(); @@ -2875,7 +2917,7 @@ public class Dag<N extends Lop> * @param inputStrings list of input strings * @return Lop.INSTRUCTION_DELIMITOR separated string */ - private static String getCSVString(ArrayList<String> inputStrings) { + private static String getCSVString(List<String> inputStrings) { StringBuilder sb = new StringBuilder(); for ( String str : inputStrings ) { if( str != null ) { @@ -2902,13 +2944,9 @@ public class Dag<N extends Lop> * @param MRJobLineNumbers MR job line numbers * @return -1 if problem */ - private int getAggAndOtherInstructions(Lop node, ArrayList<Lop> execNodes, - ArrayList<String> shuffleInstructions, - ArrayList<String> aggInstructionsReducer, - ArrayList<String> otherInstructionsReducer, - HashMap<Lop, Integer> nodeIndexMapping, int[] start_index, - ArrayList<String> inputLabels, ArrayList<Lop> inputLops, - ArrayList<Integer> MRJobLineNumbers) + private int getAggAndOtherInstructions(Lop node, List<Lop> execNodes, List<String> shuffleInstructions, + List<String> aggInstructionsReducer, List<String> otherInstructionsReducer, Map<Lop, Integer> nodeIndexMapping, + int[] start_index,List<String> inputLabels, List<Lop> inputLops, List<Integer> MRJobLineNumbers) { int ret_val = -1; @@ -2916,7 +2954,6 @@ public class Dag<N extends Lop> return nodeIndexMapping.get(node); // if not an input source and not in exec nodes, return. - if (!execNodes.contains(node)) return ret_val; @@ -3158,12 +3195,9 @@ public class Dag<N extends Lop> * @param MRJobLineNumbers MR job line numbers * @return -1 if problem */ - private static int getRecordReaderInstructions(Lop node, ArrayList<Lop> execNodes, - ArrayList<String> inputStrings, - ArrayList<String> recordReaderInstructions, - HashMap<Lop, Integer> nodeIndexMapping, int[] start_index, - ArrayList<String> inputLabels, ArrayList<Lop> inputLops, - ArrayList<Integer> MRJobLineNumbers) + private static int getRecordReaderInstructions(Lop node, List<Lop> execNodes, List<String> inputStrings, + List<String> recordReaderInstructions, Map<Lop, Integer> nodeIndexMapping, int[] start_index, + List<String> inputLabels, List<Lop> inputLops, List<Integer> MRJobLineNumbers) { // if input source, return index if (nodeIndexMapping.containsKey(node)) @@ -3258,12 +3292,9 @@ public class Dag<N extends Lop> * @param MRJoblineNumbers MR job line numbers * @return -1 if problem */ - private int getMapperInstructions(Lop node, ArrayList<Lop> execNodes, - ArrayList<String> inputStrings, - ArrayList<String> instructionsInMapper, - HashMap<Lop, Integer> nodeIndexMapping, int[] start_index, - ArrayList<String> inputLabels, ArrayList<Lop> inputLops, - ArrayList<Integer> MRJobLineNumbers) + private int getMapperInstructions(Lop node, List<Lop> execNodes, List<String> inputStrings, + List<String> instructionsInMapper, Map<Lop, Integer> nodeIndexMapping, int[] start_index, + List<String> inputLabels, List<Lop> inputLops, List<Integer> MRJobLineNumbers) { // if input source, return index if (nodeIndexMapping.containsKey(node)) @@ -3393,12 +3424,10 @@ public class Dag<N extends Lop> } // Method to populate inputs and also populates node index mapping. - private static void getInputPathsAndParameters(Lop node, ArrayList<Lop> execNodes, - ArrayList<String> inputStrings, ArrayList<InputInfo> inputInfos, - ArrayList<Long> numRows, ArrayList<Long> numCols, - ArrayList<Long> numRowsPerBlock, ArrayList<Long> numColsPerBlock, - HashMap<Lop, Integer> nodeIndexMapping, ArrayList<String> inputLabels, - ArrayList<Lop> inputLops, ArrayList<Integer> MRJobLineNumbers) { + private static void getInputPathsAndParameters(Lop node, List<Lop> execNodes, + List<String> inputStrings, List<InputInfo> inputInfos, List<Long> numRows, List<Long> numCols, + List<Long> numRowsPerBlock, List<Long> numColsPerBlock, Map<Lop, Integer> nodeIndexMapping, + List<String> inputLabels, List<Lop> inputLops, List<Integer> MRJobLineNumbers) { // treat rand as an input. if (node.getType() == Type.DataGen && execNodes.contains(node) && !nodeIndexMapping.containsKey(node)) { @@ -3514,7 +3543,7 @@ public class Dag<N extends Lop> * @param rootNodes list of root nodes * @param jt job type */ - private static void getOutputNodes(ArrayList<Lop> execNodes, ArrayList<Lop> rootNodes, JobType jt) { + private static void getOutputNodes(List<Lop> execNodes, List<Lop> rootNodes, JobType jt) { for ( Lop node : execNodes ) { // terminal node if (node.getOutputs().isEmpty() && !rootNodes.contains(node)) { @@ -3554,93 +3583,10 @@ public class Dag<N extends Lop> * @param IDMap id map * @return true if a child of b */ - private static boolean isChild(Lop a, Lop b, HashMap<Long, Integer> IDMap) { + private static boolean isChild(Lop a, Lop b, Map<Long, Integer> IDMap) { int bID = IDMap.get(b.getID()); return a.get_reachable()[bID]; } - - /** - * Sort the lops by topological order. - * - * 1) All nodes with level i appear prior to the nodes in level i+1. - * 2) All nodes within a level are ordered by their ID i.e., in the order - * they are created - * - * @param v list of lops - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - private ArrayList<Lop> doTopologicalSortStrictOrder(ArrayList<Lop> v) { - /* - * Step 1: compute the level for each node in the DAG. Level for each node is - * computed as lops are created. So, this step is need not be performed here. - * Step 2: sort the nodes by level, and within a level by node ID. - */ - - // Step1: Performed at the time of creating Lops - - // Step2: sort nodes by level, and then by node ID - Lop[] nodearray = v.toArray(new Lop[0]); - Arrays.sort(nodearray, new LopComparator()); - - return createIDMapping(nodearray); - } - - private ArrayList<Lop> doTopologicalSortTwoLevelOrder(ArrayList<Lop> v) { - //partition nodes into leaf/inner nodes and dag root nodes, - //+ sort leaf/inner nodes by ID to force depth-first scheduling - //+ append root nodes in order of their original definition - // (which also preserves the original order of prints) - Lop[] nodearray = Stream.concat( - v.stream().filter(l -> !l.getOutputs().isEmpty()).sorted(Comparator.comparing(l -> l.getID())), - v.stream().filter(l -> l.getOutputs().isEmpty())) - .toArray(Lop[]::new); - - return createIDMapping(nodearray); - } - - private ArrayList<Lop> createIDMapping(Lop[] nodearray) { - // Copy sorted nodes into "v" and construct a mapping between Lop IDs and sequence of numbers - ArrayList<Lop> ret = new ArrayList<>(); - IDMap.clear(); - - for (int i = 0; i < nodearray.length; i++) { - ret.add(nodearray[i]); - IDMap.put(nodearray[i].getID(), i); - } - - /* - * Compute of All-pair reachability graph (Transitive Closure) of the DAG. - * - Perform a depth-first search (DFS) from every node $u$ in the DAG - * - and construct the list of reachable nodes from the node $u$ - * - store the constructed reachability information in $u$.reachable[] boolean array - */ - for (int i = 0; i < nodearray.length; i++) { - dagDFS(nodearray[i], nodearray[i] - .create_reachable(nodearray.length)); - } - - // print the nodes in sorted order - if (LOG.isTraceEnabled()) { - for ( Lop vnode : ret ) { - StringBuilder sb = new StringBuilder(); - sb.append(vnode.getID()); - sb.append("("); - sb.append(vnode.getLevel()); - sb.append(") "); - sb.append(vnode.getType()); - sb.append("("); - for(Lop vin : vnode.getInputs()) { - sb.append(vin.getID()); - sb.append(","); - } - sb.append("), "); - LOG.trace(sb.toString()); - } - LOG.trace("topological sort -- done"); - } - - return ret; - } /** * Method to perform depth-first traversal from a given node in the DAG. @@ -3663,7 +3609,7 @@ public class Dag<N extends Lop> } } - private static boolean hasDirectChildNode(Lop node, ArrayList<Lop> childNodes) { + private static boolean hasDirectChildNode(Lop node, List<Lop> childNodes) { if ( childNodes.isEmpty() ) return false; for( Lop cnode : childNodes ) { @@ -3673,11 +3619,11 @@ public class Dag<N extends Lop> return false; } - private boolean hasChildNode(Lop node, ArrayList<Lop> nodes) { + private boolean hasChildNode(Lop node, List<Lop> nodes) { return hasChildNode(node, nodes, ExecLocation.INVALID); } - private boolean hasChildNode(Lop node, ArrayList<Lop> childNodes, ExecLocation type) { + private boolean hasChildNode(Lop node, List<Lop> childNodes, ExecLocation type) { if ( childNodes.isEmpty() ) return false; int index = IDMap.get(node.getID()); @@ -3688,7 +3634,7 @@ public class Dag<N extends Lop> return false; } - private Lop getChildNode(Lop node, ArrayList<Lop> childNodes, ExecLocation type) { + private Lop getChildNode(Lop node, List<Lop> childNodes, ExecLocation type) { if ( childNodes.isEmpty() ) return null; int index = IDMap.get(node.getID()); @@ -3708,7 +3654,7 @@ public class Dag<N extends Lop> * Returns null if no such "n" exists * */ - private Lop getParentNode(Lop node, ArrayList<Lop> parentNodes, ExecLocation type) { + private Lop getParentNode(Lop node, List<Lop> parentNodes, ExecLocation type) { if ( parentNodes.isEmpty() ) return null; for( Lop pn : parentNodes ) { @@ -3721,7 +3667,7 @@ public class Dag<N extends Lop> // Checks if "node" has any descendants in nodesVec with definedMRJob flag // set to true - private boolean hasMRJobChildNode(Lop node, ArrayList<Lop> nodesVec) { + private boolean hasMRJobChildNode(Lop node, List<Lop> nodesVec) { if ( nodesVec.isEmpty() ) return false; @@ -3733,7 +3679,7 @@ public class Dag<N extends Lop> return false; } - private boolean checkDataGenAsChildNode(Lop node, ArrayList<Lop> nodesVec) { + private boolean checkDataGenAsChildNode(Lop node, List<Lop> nodesVec) { if( nodesVec.isEmpty() ) return true; @@ -3747,7 +3693,7 @@ public class Dag<N extends Lop> return onlyDatagen; } - private static int getChildAlignment(Lop node, ArrayList<Lop> execNodes, ExecLocation type) + private static int getChildAlignment(Lop node, List<Lop> execNodes, ExecLocation type) { for (Lop n : node.getInputs() ) { if (!execNodes.contains(n)) @@ -3780,7 +3726,7 @@ public class Dag<N extends Lop> return MRCHILD_NOT_FOUND; } - private boolean hasParentNode(Lop node, ArrayList<Lop> parentNodes) { + private boolean hasParentNode(Lop node, List<Lop> parentNodes) { if ( parentNodes.isEmpty() ) return false; for( Lop pnode : parentNodes ) { @@ -3798,9 +3744,9 @@ public class Dag<N extends Lop> * @param insts list of instructions * @return new list of potentially modified instructions */ - private static ArrayList<Instruction> cleanupInstructions(ArrayList<Instruction> insts) { + private static ArrayList<Instruction> cleanupInstructions(List<Instruction> insts) { //step 1: create mvvar instructions: assignvar s1 s2, rmvar s1 -> mvvar s1 s2 - ArrayList<Instruction> tmp1 = collapseAssignvarAn <TRUNCATED>
