Fixed AIRAVATA-1571, and refactored the code
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/8835097e Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/8835097e Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/8835097e Branch: refs/heads/master Commit: 8835097ed697583601bde6039801a95dbdd33422 Parents: 55319c9 Author: shamrath <[email protected]> Authored: Mon Feb 23 14:14:26 2015 -0500 Committer: shamrath <[email protected]> Committed: Mon Feb 23 14:14:26 2015 -0500 ---------------------------------------------------------------------- .../engine/SimpleWorkflowInterpreter.java | 50 +++++++++++--------- .../engine/dag/nodes/ApplicationNodeImpl.java | 28 ++++++----- .../engine/dag/nodes/WorkflowInputNodeImpl.java | 13 ++--- .../workflow/engine/dag/nodes/WorkflowNode.java | 10 ++-- .../dag/nodes/WorkflowOutputNodeImpl.java | 15 +++--- .../engine/parser/AiravataDefaultParser.java | 14 +++--- 6 files changed, 70 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/8835097e/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java index 3c2596d..93b3bc0 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java @@ -102,9 +102,6 @@ public class SimpleWorkflowInterpreter implements Runnable{ setWorkflowInputNodes(workflowParser.parse()); log.debug("Parsed the workflow and got the workflow input nodes"); processWorkflowInputNodes(getWorkflowInputNodes()); -// processReadyList(); - // process workflow application nodes - // process workflow output nodes } // try to remove synchronization tag @@ -113,7 +110,8 @@ public class SimpleWorkflowInterpreter implements Runnable{ try { if (readyNode instanceof WorkflowOutputNode) { WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode; - completeWorkflowOutputs.add(wfOutputNode); + wfOutputNode.getOutputObject().setValue(wfOutputNode.getInPort().getInputObject().getValue()); + addToCompleteOutputNodeList(wfOutputNode); continue; } WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode); @@ -128,6 +126,7 @@ public class SimpleWorkflowInterpreter implements Runnable{ } } + private void publishToProcessQueue(TaskDetails process) { Thread thread = new Thread(new TempPublisher(process, eventBus)); thread.start(); @@ -140,13 +139,13 @@ public class SimpleWorkflowInterpreter implements Runnable{ if (workflowNode instanceof ApplicationNode) { ApplicationNode applicationNode = (ApplicationNode) workflowNode; List<InPort> inputPorts = applicationNode.getInputPorts(); - if (applicationNode.getNodeName().equals("Add")) { + if (applicationNode.getName().equals("Add")) { applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf( Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) + Integer.parseInt(inputPorts.get(1).getInputObject().getValue()))); - } else if (applicationNode.getNodeName().equals("Multiply")) { + } else if (applicationNode.getName().equals("Multiply")) { applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf( Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) * Integer.parseInt(inputPorts.get(1).getInputObject().getValue()))); - } else if (applicationNode.getNodeName().equals("Subtract")) { + } else if (applicationNode.getName().equals("Subtract")) { applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf( Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) - Integer.parseInt(inputPorts.get(1).getInputObject().getValue()))); } else { @@ -178,7 +177,7 @@ public class SimpleWorkflowInterpreter implements Runnable{ } private WorkflowNodeDetails createWorkflowNodeDetails(WorkflowNode readyNode) throws RegistryException { - WorkflowNodeDetails wfNodeDetails = ExperimentModelUtil.createWorkflowNode(readyNode.getNodeId(), null); + WorkflowNodeDetails wfNodeDetails = ExperimentModelUtil.createWorkflowNode(readyNode.getId(), null); ExecutionUnit executionUnit = ExecutionUnit.APPLICATION; String executionData = null; if (readyNode instanceof ApplicationNode) { @@ -218,15 +217,15 @@ public class SimpleWorkflowInterpreter implements Runnable{ Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>(); for (WorkflowInputNode wfInputNode : wfInputNodes) { if (wfInputNode.isReady()) { - log.debug("Workflow node : " + wfInputNode.getNodeId() + " is ready to execute"); + log.debug("Workflow node : " + wfInputNode.getId() + " is ready to execute"); for (Edge edge : wfInputNode.getOutPort().getOutEdges()) { edge.getToPort().getInputObject().setValue(wfInputNode.getInputObject().getValue()); if (edge.getToPort().getNode().isReady()) { addToReadyQueue(edge.getToPort().getNode()); - log.debug("Added workflow node : " + edge.getToPort().getNode().getNodeId() + " to the readyQueue"); + log.debug("Added workflow node : " + edge.getToPort().getNode().getId() + " to the readyQueue"); } else { addToWaitingQueue(edge.getToPort().getNode()); - log.debug("Added workflow node " + edge.getToPort().getNode().getNodeId() + " to the waitingQueue"); + log.debug("Added workflow node " + edge.getToPort().getNode().getId() + " to the waitingQueue"); } } @@ -311,25 +310,25 @@ public class SimpleWorkflowInterpreter implements Runnable{ case STARTED: break; case PRE_PROCESSING: - processPack.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING); + processPack.getWorkflowNode().setState(NodeState.PRE_PROCESSING); break; case INPUT_DATA_STAGING: - processPack.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING); + processPack.getWorkflowNode().setState(NodeState.PRE_PROCESSING); break; case EXECUTING: - processPack.getWorkflowNode().setNodeState(NodeState.EXECUTING); + processPack.getWorkflowNode().setState(NodeState.EXECUTING); break; case OUTPUT_DATA_STAGING: - processPack.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING); + processPack.getWorkflowNode().setState(NodeState.POST_PROCESSING); break; case POST_PROCESSING: - processPack.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING); + processPack.getWorkflowNode().setState(NodeState.POST_PROCESSING); break; case COMPLETED: - processPack.getWorkflowNode().setNodeState(NodeState.EXECUTED); + processPack.getWorkflowNode().setState(NodeState.EXECUTED); break; case FAILED: - processPack.getWorkflowNode().setNodeState(NodeState.FAILED); + processPack.getWorkflowNode().setState(NodeState.FAILED); break; case UNKNOWN: break; @@ -337,7 +336,7 @@ public class SimpleWorkflowInterpreter implements Runnable{ break; case CANCELED: case CANCELING: - processPack.getWorkflowNode().setNodeState(NodeState.FAILED); + processPack.getWorkflowNode().setState(NodeState.FAILED); break; default: break; @@ -358,12 +357,12 @@ public class SimpleWorkflowInterpreter implements Runnable{ * @param workflowNode - Workflow Node */ private synchronized void addToReadyQueue(WorkflowNode workflowNode) { - waitingList.remove(workflowNode.getNodeId()); - readList.put(workflowNode.getNodeId(), workflowNode); + waitingList.remove(workflowNode.getId()); + readList.put(workflowNode.getId(), workflowNode); } private void addToWaitingQueue(WorkflowNode workflowNode) { - waitingList.put(workflowNode.getNodeId(), workflowNode); + waitingList.put(workflowNode.getId(), workflowNode); } /** @@ -372,7 +371,7 @@ public class SimpleWorkflowInterpreter implements Runnable{ * @param processPack - has both workflow and correspond workflowNodeDetails and TaskDetails */ private synchronized void addToProcessingQueue(ProcessPack processPack) { - readList.remove(processPack.getWorkflowNode().getNodeId()); + readList.remove(processPack.getWorkflowNode().getId()); processingQueue.put(processPack.getTaskDetails().getTaskID(), processPack); } @@ -382,6 +381,11 @@ public class SimpleWorkflowInterpreter implements Runnable{ } + private void addToCompleteOutputNodeList(WorkflowOutputNode wfOutputNode) { + completeWorkflowOutputs.add(wfOutputNode); + readList.remove(wfOutputNode.getId()); + } + @Override public void run() { // TODO: Auto generated method body. http://git-wip-us.apache.org/repos/asf/airavata/blob/8835097e/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java index dd4415a..1282dd0 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java @@ -34,40 +34,46 @@ public class ApplicationNodeImpl implements ApplicationNode { private String applicationId; private List<InPort> inPorts = new ArrayList<InPort>(); private List<OutPort> outPorts = new ArrayList<OutPort>(); + private String applicationName; - public ApplicationNodeImpl(String nodeId) { - this(nodeId, null); - } +// public ApplicationNodeImpl(String nodeId) { +// this(nodeId, null); +// } +// +// public ApplicationNodeImpl(String nodeId, String applicationId) { +// this(nodeId, null, applicationId); +// } - public ApplicationNodeImpl(String nodeId, String applicationId) { + public ApplicationNodeImpl(String nodeId, String applicationName, String applicationId) { this.nodeId = nodeId; + this.applicationName = applicationName; this.applicationId = applicationId; } @Override - public String getNodeId() { + public String getId() { return this.nodeId; } @Override - public String getNodeName() { - return this.getNodeName(); + public String getName() { + return applicationName; } @Override - public NodeType getNodeType() { + public NodeType getType() { return NodeType.APPLICATION; } @Override - public NodeState getNodeState() { + public NodeState getState() { return myState; } @Override - public void setNodeState(NodeState newNodeState) { + public void setState(NodeState newState) { // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE - myState = newNodeState; + myState = newState; } @Override http://git-wip-us.apache.org/repos/asf/airavata/blob/8835097e/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java index f419ae2..a015909 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java @@ -29,6 +29,7 @@ public class WorkflowInputNodeImpl implements WorkflowInputNode { private String nodeName; private OutPort outPort; private InputDataObjectType inputDataObjectType; + private String name; public WorkflowInputNodeImpl(String nodeId) { this(nodeId, null); @@ -40,29 +41,29 @@ public class WorkflowInputNodeImpl implements WorkflowInputNode { } @Override - public String getNodeId() { + public String getId() { return this.nodeId; } @Override - public String getNodeName() { + public String getName() { return this.nodeName; } @Override - public NodeType getNodeType() { + public NodeType getType() { return NodeType.WORKFLOW_INPUT; } @Override - public NodeState getNodeState() { + public NodeState getState() { return myState; } @Override - public void setNodeState(NodeState newNodeState) { + public void setState(NodeState newState) { // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE - myState = newNodeState; + myState = newState; } @Override http://git-wip-us.apache.org/repos/asf/airavata/blob/8835097e/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowNode.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowNode.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowNode.java index f8b0e0c..f875674 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowNode.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowNode.java @@ -23,15 +23,15 @@ package org.apache.ariavata.simple.workflow.engine.dag.nodes; public interface WorkflowNode { - public String getNodeId(); + public String getId(); - public String getNodeName(); + public String getName(); - public NodeType getNodeType(); + public NodeType getType(); - public NodeState getNodeState(); + public NodeState getState(); - public void setNodeState(NodeState newNodeState); + public void setState(NodeState newState); public boolean isReady(); http://git-wip-us.apache.org/repos/asf/airavata/blob/8835097e/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java index aa7f0a3..a44c05f 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java @@ -42,34 +42,35 @@ public class WorkflowOutputNodeImpl implements WorkflowOutputNode { } @Override - public String getNodeId() { + public String getId() { return this.nodeId; } @Override - public String getNodeName() { + public String getName() { return this.nodeName; } @Override - public NodeType getNodeType() { + public NodeType getType() { return NodeType.WORKFLOW_OUTPUT; } @Override - public NodeState getNodeState() { + public NodeState getState() { return myState; } @Override - public void setNodeState(NodeState newNodeState) { + public void setState(NodeState newState) { // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE - myState = newNodeState; + myState = newState; } @Override public boolean isReady() { - return this.outputDataObjectType.getValue() != null && !this.outputDataObjectType.getValue().equals(""); + return !(inPort.getInputObject() == null || inPort.getInputObject().getValue() == null + || inPort.getInputObject().getValue().equals("")); } @Override http://git-wip-us.apache.org/repos/asf/airavata/blob/8835097e/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java index e7ac5cb..2961fde 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java @@ -21,7 +21,6 @@ package org.apache.ariavata.simple.workflow.engine.parser; -import com.sun.corba.se.pept.encoding.OutputObject; import org.airavata.appcatalog.cpi.AppCatalogException; import org.airavata.appcatalog.cpi.WorkflowCatalog; import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; @@ -104,7 +103,7 @@ public class AiravataDefaultParser implements WorkflowParser { } for (Node gNode : gNodes) { wfInputNode = new WorkflowInputNodeImpl(gNode.getID(), gNode.getName()); - wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getNodeName())); + wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getName())); if (wfInputNode.getInputObject() == null) { // TODO: throw an error and exit. } @@ -120,7 +119,7 @@ public class AiravataDefaultParser implements WorkflowParser { private void buildModel(List<PortContainer> portContainerList) { // end condition of recursive call. - if (portContainerList == null || portContainerList.size() == 0) { + if (portContainerList == null || portContainerList.isEmpty()) { return ; } DataPort dataPort = null; @@ -132,13 +131,12 @@ public class AiravataDefaultParser implements WorkflowParser { dataPort = portContainer.getDataPort(); inPort = portContainer.getInPort(); Node node = dataPort.getNode(); -// inPort.setInputObject(getInputDataObject(dataPort)); if (node instanceof WSNode) { WSNode wsNode = (WSNode) node; WorkflowNode wfNode = wfNodes.get(wsNode.getID()); if (wfNode == null) { wfApplicationNode = createApplicationNode(wsNode); - wfNodes.put(wfApplicationNode.getNodeId(), wfApplicationNode); + wfNodes.put(wfApplicationNode.getId(), wfApplicationNode); nextPortContainerList.addAll(processOutPorts(wsNode, wfApplicationNode)); } else if (wfNode instanceof ApplicationNode) { wfApplicationNode = (ApplicationNode) wfNode; @@ -152,7 +150,8 @@ public class AiravataDefaultParser implements WorkflowParser { OutputNode oNode = (OutputNode) node; wfOutputNode = createWorkflowOutputNode(oNode); wfOutputNode.setInPort(inPort); - wfNodes.put(wfOutputNode.getNodeId(), wfOutputNode); + inPort.setNode(wfOutputNode); + wfNodes.put(wfOutputNode.getId(), wfOutputNode); } } buildModel(nextPortContainerList); @@ -169,8 +168,8 @@ public class AiravataDefaultParser implements WorkflowParser { private ApplicationNode createApplicationNode(WSNode wsNode) { ApplicationNode applicationNode = new ApplicationNodeImpl(wsNode.getID(), + wsNode.getComponent().getApplication().getName(), wsNode.getComponent().getApplication().getApplicationId()); -// wsNode.getComponent().getInputPorts() return applicationNode; } @@ -197,7 +196,6 @@ public class AiravataDefaultParser implements WorkflowParser { } else if (wfNode instanceof ApplicationNode) { ApplicationNode applicationNode = ((ApplicationNode) wfNode); applicationNode.addOutPort(outPort); -// applicationNode.addInPort(inPort); } } return portContainers;
