Fixed AIRAVATA-1618, and optimized the imports.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/917adad5 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/917adad5 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/917adad5 Branch: refs/heads/master Commit: 917adad5bbec9500a0524eeb7a3ce3763c89d961 Parents: 71db390 Author: shamrath <[email protected]> Authored: Wed Mar 4 14:21:08 2015 -0500 Committer: shamrath <[email protected]> Committed: Wed Mar 4 14:21:08 2015 -0500 ---------------------------------------------------------------------- .../engine/SimpleWorkflowInterpreter.java | 112 ++++++++++--------- .../workflow/engine/WorkflowFactoryImpl.java | 2 - .../workflow/engine/dag/edge/DirectedEdge.java | 2 +- .../engine/dag/nodes/ApplicationNodeImpl.java | 7 +- .../workflow/engine/dag/nodes/NodeState.java | 28 +++-- .../engine/dag/nodes/WorkflowInputNodeImpl.java | 7 +- .../dag/nodes/WorkflowOutputNodeImpl.java | 7 +- .../engine/parser/AiravataWorkflowParser.java | 24 ++-- .../workflow/engine/parser/PortContainer.java | 2 +- .../parser/AiravataWorkflowParserTest.java | 2 +- 10 files changed, 111 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java index 8eb5d5e..a052e5c 100644 --- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java @@ -55,7 +55,6 @@ import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode; import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode; import org.apache.airavata.simple.workflow.engine.dag.port.InPort; import org.apache.airavata.simple.workflow.engine.dag.port.OutPort; -import org.apache.airavata.simple.workflow.engine.parser.AiravataWorkflowParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +77,7 @@ public class SimpleWorkflowInterpreter implements Runnable{ private String gatewayName; - private Map<String, WorkflowNode> readList = new ConcurrentHashMap<String, WorkflowNode>(); + private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<String, WorkflowNode>(); private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>(); private Map<String, ProcessContext> processingQueue = new ConcurrentHashMap<String, ProcessContext>(); private Map<String, ProcessContext> completeList = new HashMap<String, ProcessContext>(); @@ -87,6 +86,7 @@ public class SimpleWorkflowInterpreter implements Runnable{ private RabbitMQProcessPublisher publisher; private RabbitMQStatusConsumer statusConsumer; private String consumerId; + private boolean continueWorkflow = true; public SimpleWorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessPublisher publisher) throws RegistryException { this.gatewayName = gatewayName; @@ -111,33 +111,27 @@ public class SimpleWorkflowInterpreter implements Runnable{ log.debug("Parsed the workflow and got the workflow input nodes"); // process workflow input nodes processWorkflowInputNodes(getWorkflowInputNodes()); - - + // initialize the rabbitmq status consumer statusConsumer = new RabbitMQStatusConsumer(); consumerId = statusConsumer.listen(new TaskMessageHandler()); + + processReadyList(); } // try to remove synchronization tag - private synchronized void processReadyList() { - for (WorkflowNode readyNode : readList.values()) { - try { - if (readyNode instanceof WorkflowOutputNode) { - WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode; - wfOutputNode.getOutputObject().setValue(wfOutputNode.getInPort().getInputObject().getValue()); - addToCompleteOutputNodeList(wfOutputNode); - continue; - } - WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode); - TaskDetails process = getProcess(workflowNodeDetails); - ProcessContext processContext = new ProcessContext(readyNode, workflowNodeDetails, process); - addToProcessingQueue(processContext); - publishToProcessQueue(process); -// publishToProcessQueue(processPack); - } catch (RegistryException e) { - // FIXME : handle this exception - } catch (AiravataException e) { - log.error("Error while publishing process to the process queue"); + private synchronized void processReadyList() throws RegistryException, AiravataException { + for (WorkflowNode readyNode : readyList.values()) { + if (readyNode instanceof WorkflowOutputNode) { + WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode; + wfOutputNode.getOutputObject().setValue(wfOutputNode.getInPort().getInputObject().getValue()); + addToCompleteOutputNodeList(wfOutputNode); + continue; } + WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode); + TaskDetails process = getProcess(workflowNodeDetails); + ProcessContext processContext = new ProcessContext(readyNode, workflowNodeDetails, process); + addToProcessingQueue(processContext); + publishToProcessQueue(process); } } @@ -149,11 +143,6 @@ public class SimpleWorkflowInterpreter implements Runnable{ MessageContext messageContext = new MessageContext(processSubmitEvent, MessageType.TASK, process.getTaskID(), null); messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); publisher.publish(messageContext); - - -// Thread thread = new Thread(new TempPublisher(process, eventBus)); -// thread.start(); - //TODO: publish to process queue. } private TaskDetails getProcess(WorkflowNodeDetails wfNodeDetails) throws RegistryException { @@ -171,6 +160,7 @@ public class SimpleWorkflowInterpreter implements Runnable{ if (readyNode instanceof ApplicationNode) { executionUnit = ExecutionUnit.APPLICATION; executionData = ((ApplicationNode) readyNode).getApplicationId(); + setupNodeDetailsInput(((ApplicationNode) readyNode), wfNodeDetails); } else if (readyNode instanceof WorkflowInputNode) { executionUnit = ExecutionUnit.INPUT; } else if (readyNode instanceof WorkflowOutputNode) { @@ -178,25 +168,19 @@ public class SimpleWorkflowInterpreter implements Runnable{ } wfNodeDetails.setExecutionUnit(executionUnit); wfNodeDetails.setExecutionUnitData(executionData); - setupNodeDetailsInput(readyNode, wfNodeDetails); wfNodeDetails.setNodeInstanceId((String) getRegistry() .add(ChildDataType.WORKFLOW_NODE_DETAIL, wfNodeDetails, getExperiment().getExperimentID())); -// nodeInstanceList.put(node, wfNodeDetails); return wfNodeDetails; } - private void setupNodeDetailsInput(WorkflowNode readyNode, WorkflowNodeDetails wfNodeDetails) { - if (readyNode instanceof ApplicationNode) { - ApplicationNode applicationNode = (ApplicationNode) readyNode; - if (applicationNode.isReady()) { - for (InPort inPort : applicationNode.getInputPorts()) { - wfNodeDetails.addToNodeInputs(inPort.getInputObject()); - } - } else { - // TODO: handle this scenario properly. + private void setupNodeDetailsInput(ApplicationNode readyAppNode, WorkflowNodeDetails wfNodeDetails) { + if (readyAppNode.isReady()) { + for (InPort inPort : readyAppNode.getInputPorts()) { + wfNodeDetails.addToNodeInputs(inPort.getInputObject()); } } else { - // TODO: do we support for other type of workflow nodes ? + throw new IllegalArgumentException("Application node should be in ready state to set inputs to the " + + "workflow node details, nodeId = " + readyAppNode.getId()); } } @@ -253,7 +237,7 @@ public class SimpleWorkflowInterpreter implements Runnable{ */ private synchronized void addToReadyQueue(WorkflowNode workflowNode) { waitingList.remove(workflowNode.getId()); - readList.put(workflowNode.getId(), workflowNode); + readyList.put(workflowNode.getId(), workflowNode); } private void addToWaitingQueue(WorkflowNode workflowNode) { @@ -266,7 +250,7 @@ public class SimpleWorkflowInterpreter implements Runnable{ * @param processContext - has both workflow and correspond workflowNodeDetails and TaskDetails */ private synchronized void addToProcessingQueue(ProcessContext processContext) { - readList.remove(processContext.getWorkflowNode().getId()); + readyList.remove(processContext.getWorkflowNode().getId()); processingQueue.put(processContext.getTaskDetails().getTaskID(), processContext); } @@ -278,7 +262,7 @@ public class SimpleWorkflowInterpreter implements Runnable{ private void addToCompleteOutputNodeList(WorkflowOutputNode wfOutputNode) { completeWorkflowOutputs.add(wfOutputNode); - readList.remove(wfOutputNode.getId()); + readyList.remove(wfOutputNode.getId()); } @Override @@ -286,15 +270,25 @@ public class SimpleWorkflowInterpreter implements Runnable{ try { log.debug("Launching workflow"); launchWorkflow(); - while (!(waitingList.isEmpty() && readList.isEmpty())) { - processReadyList(); + while (continueWorkflow && !(waitingList.isEmpty() && readyList.isEmpty())) { +// processReadyList(); Thread.sleep(1000); } - log.info("Successfully launched workflow for experiment : " + getExperiment().getExperimentID()); - statusConsumer.stopListen(consumerId); - log.info("Successfully un-bind status consumer for experiment " + getExperiment().getExperimentID()); + if (continueWorkflow) { + log.info("Successfully launched workflow for experiment : " + getExperiment().getExperimentID()); + } else if (!(waitingList.isEmpty() || readyList.isEmpty())) { + log.error("Workflow couldn't execute all workflow nodes due to an error"); + } } catch (Exception e) { log.error("Error launching workflow", e); + } finally { + try { + statusConsumer.stopListen(consumerId); + log.info("Successfully un-bind status consumer for experiment " + getExperiment().getExperimentID()); + } catch (AiravataException e) { + log.error("Error while un-binding status consumer: " + consumerId + " for experiment " + + getExperiment().getExperimentID()); + } } } @@ -369,6 +363,12 @@ public class SimpleWorkflowInterpreter implements Runnable{ } addToCompleteQueue(processContext); log.debug("removed task from processing queue : " + taskId); + try { + processReadyList(); + } catch (Exception e) { + log.error("Error while processing ready workflow nodes", e); + continueWorkflow = false; + } } } @@ -378,39 +378,49 @@ public class SimpleWorkflowInterpreter implements Runnable{ String taskId = taskIdentity.getTaskId(); ProcessContext processContext = processingQueue.get(taskId); if (processContext != null) { - WorkflowNodeState wfNodeState = WorkflowNodeState.UNKNOWN; + WorkflowNodeState wfNodeState = WorkflowNodeState.INVOKED; switch (taskState) { case WAITING: break; case STARTED: break; case PRE_PROCESSING: + wfNodeState = WorkflowNodeState.INVOKED; processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING); break; case INPUT_DATA_STAGING: + wfNodeState = WorkflowNodeState.INVOKED; processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING); break; case EXECUTING: + wfNodeState = WorkflowNodeState.EXECUTING; processContext.getWorkflowNode().setState(NodeState.EXECUTING); break; case OUTPUT_DATA_STAGING: + wfNodeState = WorkflowNodeState.COMPLETED; processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING); break; case POST_PROCESSING: + wfNodeState = WorkflowNodeState.COMPLETED; processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING); break; case COMPLETED: + wfNodeState = WorkflowNodeState.COMPLETED; processContext.getWorkflowNode().setState(NodeState.EXECUTED); break; case FAILED: + wfNodeState = WorkflowNodeState.FAILED; processContext.getWorkflowNode().setState(NodeState.FAILED); break; case UNKNOWN: + wfNodeState = WorkflowNodeState.UNKNOWN; break; case CONFIGURING_WORKSPACE: + wfNodeState = WorkflowNodeState.COMPLETED; break; case CANCELED: case CANCELING: + wfNodeState = WorkflowNodeState.CANCELED; processContext.getWorkflowNode().setState(NodeState.FAILED); break; default: @@ -420,7 +430,9 @@ public class SimpleWorkflowInterpreter implements Runnable{ try { updateWorkflowNodeStatus(processContext.getWfNodeDetails(), wfNodeState); } catch (RegistryException e) { - // TODO: handle this. + log.error("Error while updating workflow node status update to the registry. nodeInstanceId :" + + processContext.getWfNodeDetails().getNodeInstanceId() + " status to: " + + processContext.getWfNodeDetails().getWorkflowNodeStatus().toString() , e); } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java index 23fc4c2..e70f062 100644 --- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java @@ -23,13 +23,11 @@ package org.apache.airavata.simple.workflow.engine; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.registry.cpi.RegistryException; import org.apache.airavata.simple.workflow.engine.parser.AiravataWorkflowParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; /** * Singleton class, only one instance can exist in runtime. http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java index 3bc380d..ae7498a 100644 --- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java @@ -21,8 +21,8 @@ package org.apache.airavata.simple.workflow.engine.dag.edge; -import org.apache.airavata.simple.workflow.engine.dag.port.OutPort; import org.apache.airavata.simple.workflow.engine.dag.port.InPort; +import org.apache.airavata.simple.workflow.engine.dag.port.OutPort; public class DirectedEdge implements Edge { http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java index 52b0595..1233a9d 100644 --- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java @@ -72,8 +72,11 @@ public class ApplicationNodeImpl implements ApplicationNode { @Override public void setState(NodeState newState) { - // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE - myState = newState; + if (newState.getLevel() > myState.getLevel()) { + myState = newState; + } else { + throw new IllegalStateException("Node state can't be reversed. currentState : " + myState.toString() + " , newState " + newState.toString()); + } } @Override http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java index 333fcb2..edbeec5 100644 --- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java @@ -22,13 +22,23 @@ package org.apache.airavata.simple.workflow.engine.dag.nodes; public enum NodeState { - WAITING, // waiting on inputs - READY, // all inputs are available and ready to execute - QUEUED, // - PRE_PROCESSING, // - EXECUTING, // task has been submitted , not yet finish - EXECUTED, // task executed - POST_PROCESSING, // - FAILED, - COMPLETE // all works done + WAITING(0), // waiting on inputs + READY(1), // all inputs are available and ready to execute + QUEUED(2), // + PRE_PROCESSING(3), // + EXECUTING(4), // task has been submitted , not yet finish + EXECUTED(5), // task executed + POST_PROCESSING(6), // + FAILED(7), + COMPLETE(8); // all works done + + private int level; + + NodeState(int level) { + this.level = level; + } + + public int getLevel() { + return level; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java index b3dfa62..7ba8908 100644 --- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java @@ -62,8 +62,11 @@ public class WorkflowInputNodeImpl implements WorkflowInputNode { @Override public void setState(NodeState newState) { - // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE - myState = newState; + if (newState.getLevel() > myState.getLevel()) { + myState = newState; + } else { + throw new IllegalStateException("Node state can't be reversed. currentState : " + myState.toString() + " , newState " + newState.toString()); + } } @Override http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java index 5924212..6c80517 100644 --- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java @@ -63,8 +63,11 @@ public class WorkflowOutputNodeImpl implements WorkflowOutputNode { @Override public void setState(NodeState newState) { - // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE - myState = newState; + if (newState.getLevel() > myState.getLevel()) { + myState = newState; + } else { + throw new IllegalStateException("Node state can't be reversed. currentState : " + myState.toString() + " , newState " + newState.toString()); + } } @Override http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java index a430879..f7d53be 100644 --- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java @@ -31,9 +31,19 @@ import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; import org.apache.airavata.registry.cpi.Registry; import org.apache.airavata.registry.cpi.RegistryException; import org.apache.airavata.registry.cpi.RegistryModelType; +import org.apache.airavata.simple.workflow.engine.WorkflowParser; +import org.apache.airavata.simple.workflow.engine.dag.edge.DirectedEdge; +import org.apache.airavata.simple.workflow.engine.dag.edge.Edge; +import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode; import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNodeImpl; import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode; import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNodeImpl; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNodeImpl; +import org.apache.airavata.simple.workflow.engine.dag.port.InPort; +import org.apache.airavata.simple.workflow.engine.dag.port.InputPortIml; +import org.apache.airavata.simple.workflow.engine.dag.port.OutPort; import org.apache.airavata.simple.workflow.engine.dag.port.OutPortImpl; import org.apache.airavata.workflow.model.component.ComponentException; import org.apache.airavata.workflow.model.component.system.ConstantComponent; @@ -49,16 +59,6 @@ import org.apache.airavata.workflow.model.graph.system.SystemDataPort; import org.apache.airavata.workflow.model.graph.ws.WSNode; import org.apache.airavata.workflow.model.graph.ws.WSPort; import org.apache.airavata.workflow.model.wf.Workflow; -import org.apache.airavata.simple.workflow.engine.WorkflowParser; -import org.apache.airavata.simple.workflow.engine.dag.edge.DirectedEdge; -import org.apache.airavata.simple.workflow.engine.dag.edge.Edge; -import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode; -import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode; -import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode; -import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNodeImpl; -import org.apache.airavata.simple.workflow.engine.dag.port.InPort; -import org.apache.airavata.simple.workflow.engine.dag.port.InputPortIml; -import org.apache.airavata.simple.workflow.engine.dag.port.OutPort; import java.util.ArrayList; import java.util.HashMap; @@ -103,7 +103,7 @@ public class AiravataWorkflowParser implements WorkflowParser { wfInputNode = new WorkflowInputNodeImpl(gNode.getID(), gNode.getName()); wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getId())); if (wfInputNode.getInputObject() == null) { - // TODO: throw an error and exit. + throw new RuntimeException("Workflow Input object is not set, workflow node id: " + wfInputNode.getId()); } portContainers.addAll(processOutPorts(gNode, wfInputNode)); wfInputNodes.add(wfInputNode); @@ -139,7 +139,7 @@ public class AiravataWorkflowParser implements WorkflowParser { } else if (wfNode instanceof ApplicationNode) { wfApplicationNode = (ApplicationNode) wfNode; } else { - // TODO : handle this scenario + throw new IllegalArgumentException("Only support for ApplicationNode implementation, but found other type for node implementation"); } inPort.setNode(wfApplicationNode); wfApplicationNode.addInPort(inPort); http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/PortContainer.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/PortContainer.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/PortContainer.java index db3dda5..4ddb8b9 100644 --- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/PortContainer.java +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/PortContainer.java @@ -21,8 +21,8 @@ package org.apache.airavata.simple.workflow.engine.parser; -import org.apache.airavata.workflow.model.graph.DataPort; import org.apache.airavata.simple.workflow.engine.dag.port.InPort; +import org.apache.airavata.workflow.model.graph.DataPort; public class PortContainer { http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java b/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java index 6443806..d843abe 100644 --- a/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java +++ b/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java @@ -26,9 +26,9 @@ import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; import org.apache.airavata.model.workspace.experiment.Experiment; import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode; import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode; import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode; import org.apache.airavata.workflow.model.wf.Workflow; -import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode; import org.junit.After; import org.junit.Assert; import org.junit.Before;
