Repository: airavata Updated Branches: refs/heads/master 80d9df511 -> a6c483cdb
Implement Airavata default parser to next iteration. Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e3ce93e1 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e3ce93e1 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e3ce93e1 Branch: refs/heads/master Commit: e3ce93e1f4b78d01125bc06a894533424723d9a6 Parents: c90662f Author: shamrath <[email protected]> Authored: Fri Feb 13 14:56:24 2015 -0500 Committer: shamrath <[email protected]> Committed: Fri Feb 13 14:56:24 2015 -0500 ---------------------------------------------------------------------- modules/simple-workflow/pom.xml | 18 ++ .../workflow/engine/AiravataDefaultParser.java | 111 ---------- .../workflow/engine/SimpleWorkflowEngine.java | 6 + .../engine/SimpleWorkflowInterpreter.java | 20 +- .../workflow/engine/dag/edge/DirectedEdge.java | 31 +++ .../workflow/engine/dag/edge/DirectedLink.java | 40 ---- .../simple/workflow/engine/dag/edge/Edge.java | 24 +- .../engine/dag/nodes/ApplicationNode.java | 8 +- .../engine/dag/nodes/ApplicationNodeImpl.java | 63 ++---- .../engine/dag/nodes/WorkflowInputNode.java | 7 +- .../engine/dag/nodes/WorkflowInputNodeImpl.java | 25 ++- .../dag/nodes/WorkflowOutputNodeImpl.java | 28 ++- .../simple/workflow/engine/dag/port/InPort.java | 5 +- .../workflow/engine/dag/port/InputPortIml.java | 58 ++++- .../workflow/engine/dag/port/OutPort.java | 4 +- .../workflow/engine/dag/port/OutPortImpl.java | 62 ++++++ .../simple/workflow/engine/dag/port/Port.java | 8 + .../engine/parser/AiravataDefaultParser.java | 221 +++++++++++++++++++ .../workflow/engine/parser/PortContainer.java | 32 +++ .../simple/workflow/engine/WorkflowDAGTest.java | 25 +++ 20 files changed, 536 insertions(+), 260 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/pom.xml ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/pom.xml b/modules/simple-workflow/pom.xml index a296417..623934d 100644 --- a/modules/simple-workflow/pom.xml +++ b/modules/simple-workflow/pom.xml @@ -29,6 +29,24 @@ <artifactId>airavata-jpa-registry</artifactId> <version>${project.version}</version> </dependency> + + <!-- Airavata default parser dependency --> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-workflow-model-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>app-catalog-data</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>app-catalog-cpi</artifactId> + <version>${project.version}</version> + </dependency> + <!-- Messaging dependency --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/AiravataDefaultParser.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/AiravataDefaultParser.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/AiravataDefaultParser.java deleted file mode 100644 index 3819af4..0000000 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/AiravataDefaultParser.java +++ /dev/null @@ -1,111 +0,0 @@ -package org.apache.ariavata.simple.workflow.engine; - -import org.airavata.appcatalog.cpi.AppCatalogException; -import org.airavata.appcatalog.cpi.WorkflowCatalog; -import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; -import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; -import org.apache.airavata.model.workspace.experiment.Experiment; -import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; -import org.apache.airavata.registry.cpi.Registry; -import org.apache.airavata.registry.cpi.RegistryException; -import org.apache.airavata.registry.cpi.RegistryModelType; -import org.apache.airavata.workflow.model.component.ComponentException; -import org.apache.airavata.workflow.model.component.system.ConstantComponent; -import org.apache.airavata.workflow.model.component.system.InputComponent; -import org.apache.airavata.workflow.model.component.system.S3InputComponent; -import org.apache.airavata.workflow.model.graph.GraphException; -import org.apache.airavata.workflow.model.graph.Node; -import org.apache.airavata.workflow.model.graph.impl.NodeImpl; -import org.apache.airavata.workflow.model.wf.Workflow; -import org.apache.ariavata.simple.workflow.engine.dag.edge.Edge; -import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowInputNode; -import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode; -import org.apache.ariavata.simple.workflow.engine.dag.port.OutPort; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -//import org.apache.airavata.model.Workflow; - -/** - * Created by shameera on 2/11/15. - */ -public class AiravataDefaultParser implements WorkflowParser { - - private String experimentId; - private String credentialToken ; - private Workflow workflow; - private Experiment experiment; - - - public AiravataDefaultParser(String experimentId, String credentialToken) { - this.experimentId = experimentId; - this.credentialToken = credentialToken; - } - - @Override - public List<WorkflowInputNode> parse() throws RegistryException, AppCatalogException, - ComponentException, GraphException { - return parseWorkflow(getWorkflowFromExperiment()); - } - - private List<WorkflowInputNode> parseWorkflow(Workflow workflow) { - List<Node> gNodes = getInputNodes(workflow); - List<WorkflowInputNode> wfInputNodes = new ArrayList<WorkflowInputNode>(); - List<WorkflowNode> wfNodes = new ArrayList<WorkflowNode>(); - List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs(); - Map<String,InputDataObjectType> inputDataMap=new HashMap<String, InputDataObjectType>(); - WorkflowInputNode wfInputNode = null; - for (InputDataObjectType dataObjectType : experimentInputs) { - inputDataMap.put(dataObjectType.getName(), dataObjectType); - } - for (Node gNode : gNodes) { - // create a new wfInputNode instance by passing node name and node Id - wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getNodeName())); - if (wfInputNode.getInputObject() == null) { - // TODO: throw an error and exit. - } - Edge edge = null;//= new Edge - OutPort outPort = null; - outPort. - link.setOutPort(null); // new Output - wfInputNodes.add(wfInputNode); - } - - - - - - return null; - } - - private WorkflowInputNode getWorkflowInputNode(Node inputNode) { - // FIXME: create a new workflow input node implementation with input node data. - return null; - } - - private Workflow getWorkflowFromExperiment() throws RegistryException, AppCatalogException, GraphException, ComponentException { - Registry registry = RegistryFactory.getDefaultRegistry(); - experiment = (Experiment)registry.get(RegistryModelType.EXPERIMENT, experimentId); - WorkflowCatalog workflowCatalog = getWorkflowCatalog(); - return new Workflow(workflowCatalog.getWorkflow(experiment.getApplicationId()).getGraph()); - } - - private WorkflowCatalog getWorkflowCatalog() throws AppCatalogException { - return AppCatalogFactory.getAppCatalog().getWorkflowCatalog(); - } - - private ArrayList<Node> getInputNodes(Workflow wf) { - ArrayList<Node> list = new ArrayList<Node>(); - List<NodeImpl> nodes = wf.getGraph().getNodes(); - for (Node node : nodes) { - String name = node.getComponent().getName(); - if (InputComponent.NAME.equals(name) || ConstantComponent.NAME.equals(name) || S3InputComponent.NAME.equals(name)) { - list.add(node); - } - } - return list; - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowEngine.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowEngine.java index 5a61874..ff057fb 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowEngine.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowEngine.java @@ -22,4 +22,10 @@ package org.apache.ariavata.simple.workflow.engine; public class SimpleWorkflowEngine{ + + + public void invoke () { + WorkflowParser parser = WorkflowFactoryImpl.getInstance().getWorkflowParser(); + + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java index 3266423..b4bc9a2 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java @@ -76,7 +76,7 @@ public class SimpleWorkflowInterpreter { } - public void launchWorkflow() { + public void launchWorkflow() throws Exception { // process workflow input nodes processWorkflowInputNodes(getWorkflowInputNodes()); processReadyList(); @@ -151,10 +151,10 @@ public class SimpleWorkflowInterpreter { Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>(); for (WorkflowInputNode wfInputNode : wfInputNodes) { if (wfInputNode.isSatisfy()) { - for (Edge edge : wfInputNode.getOutputLinks()) { - WorkflowUtil.copyValues(wfInputNode.getInputObject(), edge.getInPort().getInputObject()); - tempNodeSet.add(edge.getInPort().getNode()); - } +// for (Edge edge : wfInputNode.getOutputLinks()) { +// WorkflowUtil.copyValues(wfInputNode.getInputObject(), edge.getToPort().getInputObject()); +// tempNodeSet.add(edge.getToPort().getNode()); +// } } } for (WorkflowNode workflowNode : tempNodeSet) { @@ -167,7 +167,7 @@ public class SimpleWorkflowInterpreter { } - public List<WorkflowInputNode> getWorkflowInputNodes() { + public List<WorkflowInputNode> getWorkflowInputNodes() throws Exception { if (workflowInputNodes == null) { // read workflow description from registry and parse it WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance(); @@ -215,10 +215,10 @@ public class SimpleWorkflowInterpreter { ApplicationNode applicationNode = (ApplicationNode) workflowNode; // Workflow node can have one to many output ports and each output port can have one to many links for (OutPort outPort : applicationNode.getOutputPorts()) { - for (Edge edge : outPort.getOutputLinks()) { - WorkflowUtil.copyValues(outPort.getOutputObject(), edge.getInPort().getInputObject()); - tempWfNodeSet.add(edge.getInPort().getNode()); - } +// for (Edge edge : outPort.getOutputLinks()) { +// WorkflowUtil.copyValues(outPort.getOutputObject(), edge.getToPort().getInputObject()); +// tempWfNodeSet.add(edge.getToPort().getNode()); +// } } for (WorkflowNode node : tempWfNodeSet) { http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/edge/DirectedEdge.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/edge/DirectedEdge.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/edge/DirectedEdge.java new file mode 100644 index 0000000..4b05740 --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/edge/DirectedEdge.java @@ -0,0 +1,31 @@ +package org.apache.ariavata.simple.workflow.engine.dag.edge; + +import org.apache.ariavata.simple.workflow.engine.dag.port.InPort; +import org.apache.ariavata.simple.workflow.engine.dag.port.OutPort; + + +public class DirectedEdge implements Edge { + + private InPort inPort; + private OutPort outPort; + + @Override + public InPort getToPort() { + return null; // TODO: Auto generated method body. + } + + @Override + public void setToPort(InPort inPort) { + // TODO: Auto generated method body. + } + + @Override + public OutPort getFromPort() { + return null; // TODO: Auto generated method body. + } + + @Override + public void setFromPort(OutPort outPort) { + // TODO: Auto generated method body. + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/edge/DirectedLink.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/edge/DirectedLink.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/edge/DirectedLink.java deleted file mode 100644 index bff9079..0000000 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/edge/DirectedLink.java +++ /dev/null @@ -1,40 +0,0 @@ -/* -package org.apache.ariavata.simple.workflow.engine.dag.links; - -import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode; - -*/ -/** - * Created by shameera on 1/29/15. - *//* - -public class DirectedLink implements Edge{ - - private WorkflowNode _fromNode; - - private WorkflowNode _toNode; - - public DirectedLink(WorkflowNode _fromNode, WorkflowNode _toNode) { - this._fromNode = _fromNode; - this._toNode = _toNode; - } - - @Override - public WorkflowNode fromNode() { - return null; - } - - @Override - public WorkflowNode toNode() { - return null; - } - - public void set_fromNode(WorkflowNode _fromNode) { - this._fromNode = _fromNode; - } - - public void set_toNode(WorkflowNode _toNode) { - this._toNode = _toNode; - } -} -*/ http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/edge/Edge.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/edge/Edge.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/edge/Edge.java index ed86b67..667a01d 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/edge/Edge.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/edge/Edge.java @@ -7,29 +7,19 @@ import org.apache.ariavata.simple.workflow.engine.dag.port.InPort; import org.apache.ariavata.simple.workflow.engine.dag.port.OutPort; /** - * Created by shameera on 1/29/15. + * Edge is a link to one node to another, basically edge should have outPort of a workflow node , + * which is starting point and inPort of a workflow node, which is end point of the edge. */ -public interface Edge { - -// public WorkflowNode fromNode(); - -// public WorkflowNode toNode(); - -/* public InputDataObjectType getInputObject(); - public void setInputObject(); - - public OutputDataObjectType getOutputObject(); - - public void setOutputObject();*/ +public interface Edge { - public InPort getInPort(); + public InPort getToPort(); - public void setInPort(InPort inPort); + public void setToPort(InPort inPort); - public OutPort getOutPort(); + public OutPort getFromPort(); - public void setOutPort(OutPort outPort); + public void setFromPort(OutPort outPort); } http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNode.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNode.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNode.java index daa0038..cd2a955 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNode.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNode.java @@ -28,14 +28,14 @@ import java.util.List; public interface ApplicationNode extends WorkflowNode { - public void addInputPort(InPort inPort); + public String getApplicationId(); + +// public void addInputPort(InPort inPort); public List<InPort> getInputPorts(); - public void addOutputPort(OutPort outPort); +// public void addOutputPort(OutPort outPort); public List<OutPort> getOutputPorts(); - public String getApplicationId(); - } http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java index 4f4f6f4..9388c43 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java @@ -17,27 +17,30 @@ * specific language governing permissions and limitations * under the License. * - *//* + */ package org.apache.ariavata.simple.workflow.engine.dag.nodes; -import org.apache.ariavata.simple.workflow.engine.dag.links.Edge; -import org.apache.ariavata.simple.workflow.engine.dag.port.InputPort; -import org.apache.ariavata.simple.workflow.engine.dag.port.OutputPort; -import java.util.ArrayList; +import org.apache.ariavata.simple.workflow.engine.dag.port.InPort; +import org.apache.ariavata.simple.workflow.engine.dag.port.OutPort; + import java.util.List; public class ApplicationNodeImpl implements ApplicationNode { private final String nodeId; private NodeState myState = NodeState.WAITING; - private List<Edge> inputLinks = new ArrayList<Edge>(); - private List<Edge> outputLinks = new ArrayList<Edge>(); + private String applicationId; public ApplicationNodeImpl(String nodeId) { + this(nodeId, null); + } + + public ApplicationNodeImpl(String nodeId, String applicationId) { this.nodeId = nodeId; + this.applicationId = applicationId; } @Override @@ -46,6 +49,11 @@ public class ApplicationNodeImpl implements ApplicationNode { } @Override + public String getNodeName() { + return null; // TODO: Auto generated method body. + } + + @Override public NodeType getNodeType() { return NodeType.APPLICATION; } @@ -62,47 +70,22 @@ public class ApplicationNodeImpl implements ApplicationNode { } @Override - public void addInputPort(InputPort inputPort) { - + public boolean isSatisfy() { + return false; // TODO: Auto generated method body. } @Override - public List<InputPort> getInputPorts() { - return null; + public String getApplicationId() { + return null; // TODO: Auto generated method body. } @Override - public void addOutputPort(OutputPort outputPort) { - + public List<InPort> getInputPorts() { + return null; // TODO: Auto generated method body. } @Override - public List<OutputPort> getOutputPorts() { - return null; - } - - public List<Edge> getInputLinks() { - return inputLinks; - } - - public List<Edge> getOutputLinks() { - return outputLinks; - } - - public void setInputLinks(List<Edge> inputLinks) { - this.inputLinks = inputLinks; - } - - public void setOutputLinks(List<Edge> outputLinks) { - this.outputLinks = outputLinks; - } - - public void addInputLink(Edge inputLink) { - inputLinks.add(inputLink); - } - - public void addOutputLink(Edge outputLink) { - outputLinks.add(outputLink); + public List<OutPort> getOutputPorts() { + return null; // TODO: Auto generated method body. } } -*/ http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNode.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNode.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNode.java index 55b19da..8d24c96 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNode.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNode.java @@ -23,6 +23,7 @@ package org.apache.ariavata.simple.workflow.engine.dag.nodes; import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; import org.apache.ariavata.simple.workflow.engine.dag.edge.Edge; +import org.apache.ariavata.simple.workflow.engine.dag.port.OutPort; import java.util.List; @@ -32,10 +33,6 @@ public interface WorkflowInputNode extends WorkflowNode { public void setInputObject(InputDataObjectType inputObject); - public void addOutputLink(Edge outputEdge); - - public void addOutputLink(List<Edge> outputEdges); - - public List<Edge> getOutputLinks(); + public OutPort getOutPort(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java index 54ea438..31bd6b0 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,24 +15,29 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * - *//* + */ + package org.apache.ariavata.simple.workflow.engine.dag.nodes; import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; -import org.apache.ariavata.simple.workflow.engine.dag.port.OutputPort; +import org.apache.ariavata.simple.workflow.engine.dag.port.OutPort; -import java.util.List; public class WorkflowInputNodeImpl implements WorkflowInputNode { private NodeState myState = NodeState.READY; private final String nodeId; + private String nodeName; public WorkflowInputNodeImpl(String nodeId) { + this(nodeId, null); + } + + public WorkflowInputNodeImpl(String nodeId, String nodeName) { this.nodeId = nodeId; + this.nodeName = nodeName; } @Override @@ -42,6 +46,11 @@ public class WorkflowInputNodeImpl implements WorkflowInputNode { } @Override + public String getNodeName() { + return null; // TODO: Auto generated method body. + } + + @Override public NodeType getNodeType() { return NodeType.WORKFLOW_INPUT; } @@ -68,14 +77,14 @@ public class WorkflowInputNodeImpl implements WorkflowInputNode { } @Override - public void addOutputPort(OutputPort outputPort) { + public void setInputObject(InputDataObjectType inputObject) { // TODO: Auto generated method body. } @Override - public List<OutputPort> getOutputPorts() { + public OutPort getOutPort() { return null; // TODO: Auto generated method body. } + } -*/ http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java index 5978884..36ab1f6 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java @@ -17,22 +17,28 @@ * specific language governing permissions and limitations * under the License. * - *//* + */ package org.apache.ariavata.simple.workflow.engine.dag.nodes; -import org.apache.ariavata.simple.workflow.engine.dag.port.InputPort; -import java.util.List; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.ariavata.simple.workflow.engine.dag.edge.Edge; public class WorkflowOutputNodeImpl implements WorkflowOutputNode { private NodeState myState = NodeState.WAITING; private final String nodeId; + private String nodeName; public WorkflowOutputNodeImpl(String nodeId) { + this(nodeId, null); + } + + public WorkflowOutputNodeImpl(String nodeId, String nodeName) { this.nodeId = nodeId; + this.nodeName = nodeName; } @Override @@ -41,6 +47,11 @@ public class WorkflowOutputNodeImpl implements WorkflowOutputNode { } @Override + public String getNodeName() { + return null; // TODO: Auto generated method body. + } + + @Override public NodeType getNodeType() { return NodeType.WORKFLOW_OUTPUT; } @@ -62,14 +73,13 @@ public class WorkflowOutputNodeImpl implements WorkflowOutputNode { } @Override - public void addInputPort(InputPort inputPort) { - + public OutputDataObjectType getOutputObject() { + return null; // TODO: Auto generated method body. } @Override - public List<InputPort> getInputPorts() { - return null; + public Edge getInputLink() { + return null; // TODO: Auto generated method body. } - } -*/ + http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InPort.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InPort.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InPort.java index 346c0f7..c635bef 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InPort.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InPort.java @@ -30,7 +30,8 @@ public interface InPort extends Port { public InputDataObjectType getInputObject(); - public Edge getInputLink(); + public Edge getEdge(); + + public void addEdge(Edge edge); - public void setInputLink(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InputPortIml.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InputPortIml.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InputPortIml.java index 59f64bd..b33b91b 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InputPortIml.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InputPortIml.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,29 +15,64 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * - *//* - - + */ package org.apache.ariavata.simple.workflow.engine.dag.port; import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; -import org.apache.ariavata.simple.workflow.engine.dag.links.Edge; +import org.apache.ariavata.simple.workflow.engine.dag.edge.Edge; +import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode; + +import java.util.List; + +public class InputPortIml implements InPort { + + private InputDataObjectType inputDataObjectType; + private boolean isSatisfy = false; + private String portId; + private Edge edge; + + public InputPortIml(String portId) { + this.portId = portId; + } + + @Override + public void setInputObject(InputDataObjectType inputObject) { + // TODO: Auto generated method body. + } -public class InputPortIml implements InputPort { @Override public InputDataObjectType getInputObject() { - return null; + return null; // TODO: Auto generated method body. } @Override - public Edge getInputLink() { - return null; + public Edge getEdge() { + return this.edge; + } + + @Override + public void addEdge(Edge edge) { + this.edge = edge; } @Override public boolean isSatisfy() { - return false; + return false; // TODO: Auto generated method body. + } + + @Override + public WorkflowNode getNode() { + return null; // TODO: Auto generated method body. + } + + @Override + public void setNode(WorkflowNode workflowNode) { + // TODO: Auto generated method body. + } + + @Override + public String getId() { + return null; // TODO: Auto generated method body. } + } -*/ http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/OutPort.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/OutPort.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/OutPort.java index fa5baa2..04a7e1e 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/OutPort.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/OutPort.java @@ -32,8 +32,8 @@ public interface OutPort extends Port { public OutputDataObjectType getOutputObject(); - public List<Edge> getOutputLinks(); + public List<Edge> getOutEdges(); - public void addOutputLink(); + public void addEdge(Edge edge); } http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/OutPortImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/OutPortImpl.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/OutPortImpl.java new file mode 100644 index 0000000..bc7628f --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/OutPortImpl.java @@ -0,0 +1,62 @@ +package org.apache.ariavata.simple.workflow.engine.dag.port; + +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.ariavata.simple.workflow.engine.dag.edge.Edge; +import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode; + +import java.util.List; + +/** + * Created by shameera on 2/11/15. + */ +public class OutPortImpl implements OutPort { + + private OutputDataObjectType outputDataObjectType; + private List<Edge> outEdges; + private boolean isSatisfy = false; + private String portId; + + public OutPortImpl(String portId) { + this.portId = portId; + } + + @Override + public void setOutputObject(OutputDataObjectType outputObject) { + // TODO: Auto generated method body. + } + + @Override + public OutputDataObjectType getOutputObject() { + return null; // TODO: Auto generated method body. + } + + @Override + public List<Edge> getOutEdges() { + return null; // TODO: Auto generated method body. + } + + @Override + public void addEdge(Edge edge) { + // TODO: Auto generated method body. + } + + @Override + public boolean isSatisfy() { + return false; // TODO: Auto generated method body. + } + + @Override + public WorkflowNode getNode() { + return null; // TODO: Auto generated method body. + } + + @Override + public void setNode(WorkflowNode workflowNode) { + // TODO: Auto generated method body. + } + + @Override + public String getId() { + return null; // TODO: Auto generated method body. + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/Port.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/Port.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/Port.java index ca8246a..8c5d6c5 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/Port.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/Port.java @@ -21,11 +21,19 @@ package org.apache.ariavata.simple.workflow.engine.dag.port; +import org.apache.ariavata.simple.workflow.engine.dag.edge.Edge; import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode; +import java.util.List; + public interface Port { public boolean isSatisfy(); public WorkflowNode getNode(); + + public void setNode(WorkflowNode workflowNode); + + public String getId(); + } http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java new file mode 100644 index 0000000..92e9811 --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java @@ -0,0 +1,221 @@ +package org.apache.ariavata.simple.workflow.engine.parser; + +import org.airavata.appcatalog.cpi.AppCatalogException; +import org.airavata.appcatalog.cpi.WorkflowCatalog; +import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.model.workspace.experiment.Experiment; +import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; +import org.apache.airavata.registry.cpi.Registry; +import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.airavata.registry.cpi.RegistryModelType; +import org.apache.airavata.workflow.model.component.ComponentException; +import org.apache.airavata.workflow.model.component.system.ConstantComponent; +import org.apache.airavata.workflow.model.component.system.InputComponent; +import org.apache.airavata.workflow.model.component.system.S3InputComponent; +import org.apache.airavata.workflow.model.graph.DataEdge; +import org.apache.airavata.workflow.model.graph.DataPort; +import org.apache.airavata.workflow.model.graph.GraphException; +import org.apache.airavata.workflow.model.graph.Node; +import org.apache.airavata.workflow.model.graph.impl.NodeImpl; +import org.apache.airavata.workflow.model.graph.system.OutputNode; +import org.apache.airavata.workflow.model.graph.ws.WSNode; +import org.apache.airavata.workflow.model.graph.ws.WSPort; +import org.apache.airavata.workflow.model.wf.Workflow; +import org.apache.ariavata.simple.workflow.engine.WorkflowParser; +import org.apache.ariavata.simple.workflow.engine.dag.edge.DirectedEdge; +import org.apache.ariavata.simple.workflow.engine.dag.edge.Edge; +import org.apache.ariavata.simple.workflow.engine.dag.nodes.ApplicationNode; +import org.apache.ariavata.simple.workflow.engine.dag.nodes.ApplicationNodeImpl; +import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowInputNode; +import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowInputNodeImpl; +import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode; +import org.apache.ariavata.simple.workflow.engine.dag.port.InPort; +import org.apache.ariavata.simple.workflow.engine.dag.port.InputPortIml; +import org.apache.ariavata.simple.workflow.engine.dag.port.OutPort; +import org.apache.ariavata.simple.workflow.engine.dag.port.OutPortImpl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +//import org.apache.airavata.model.Workflow; + +/** + * Created by shameera on 2/11/15. + */ +public class AiravataDefaultParser implements WorkflowParser { + + private String experimentId; + private String credentialToken ; + private Workflow workflow; + private Experiment experiment; + private Map<String, ApplicationNode> wfNodes = new HashMap<String, ApplicationNode>(); + + + public AiravataDefaultParser(String experimentId, String credentialToken) { + this.experimentId = experimentId; + this.credentialToken = credentialToken; + } + + @Override + public List<WorkflowInputNode> parse() throws RegistryException, AppCatalogException, + ComponentException, GraphException { + return parseWorkflow(getWorkflowFromExperiment()); + } + + private List<WorkflowInputNode> parseWorkflow(Workflow workflow) { + List<Node> gNodes = getInputNodes(workflow); + List<WorkflowInputNode> wfInputNodes = new ArrayList<WorkflowInputNode>(); + List<PortContainer> portContainers = new ArrayList<PortContainer>(); + List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs(); + Map<String,InputDataObjectType> inputDataMap=new HashMap<String, InputDataObjectType>(); + WorkflowInputNode wfInputNode = null; + for (InputDataObjectType dataObjectType : experimentInputs) { + inputDataMap.put(dataObjectType.getName(), dataObjectType); + } + OutPort outPort = null; + InPort inPort = null; + Edge edge = null; + for (Node gNode : gNodes) { + wfInputNode = new WorkflowInputNodeImpl(gNode.getID(), gNode.getName()); + wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getNodeName())); + if (wfInputNode.getInputObject() == null) { + // TODO: throw an error and exit. + } + for (DataPort dataPort : gNode.getInputPorts()) { + outPort = new OutPortImpl(dataPort.getID()); + for (DataEdge dataEdge : dataPort.getEdges()) { + edge = new DirectedEdge(); + edge.setFromPort(outPort); + outPort.addEdge(edge); + inPort = getInPort(dataEdge.getToPort()); + edge.setToPort(inPort); + inPort.addEdge(edge); + portContainers.add(new PortContainer(dataEdge.getToPort(), inPort)); + } + outPort.setOutputObject(getOutputDataObject(wfInputNode.getInputObject())); + } + wfInputNodes.add(wfInputNode); + } + + // while port container empty iterate graph and build the workflow DAG. + buildModel(portContainers); + + return wfInputNodes; + } + + private void buildModel(List<PortContainer> portContainerList) { + // end condition of recursive call. + if (portContainerList == null || portContainerList.size() == 0) { + return ; + } + DataPort dataPort = null; + InPort inPort = null; + WorkflowNode wfNode = null; + List<PortContainer> nextPortContainerList = new ArrayList<PortContainer>(); + for (PortContainer portContainer : portContainerList) { + dataPort = portContainer.getDataPort(); + inPort = portContainer.getInPort(); + Node node = dataPort.getNode(); + inPort.setInputObject(getInputDataObject(dataPort)); + if (node instanceof WSNode) { + WSNode wsNode = (WSNode) node; + wfNode = wfNodes.get(wsNode.getID()); + if (wfNode == null) { + wfNode = new ApplicationNodeImpl(wsNode.getID(), + wsNode.getComponent().getApplication().getApplicationId()); + nextPortContainerList.addAll(processOutPorts(wsNode, wfNode)); + } + }else if (node instanceof OutputNode) { + OutputNode oNode = (OutputNode) node; + wfNode = new WorkflowInputNodeImpl(oNode.getID(), oNode.getName()); + } + inPort.setNode(wfNode); + buildModel(nextPortContainerList); + // set the workflow node to inPort + // if require check the types of inputs and output ports, + // add outputPorts to the workflow node + // add edges to each output port + // add inport and indataport to the list + // recursively call the function. + } + + } + + private List<PortContainer> processOutPorts(Node node, WorkflowNode wfNode) { + OutPort outPort ; + Edge edge; + InPort inPort; + List<PortContainer> portContainers = new ArrayList<PortContainer>(); + for (DataPort dataPort : node.getOutputPorts()) { + outPort = new OutPortImpl(dataPort.getID()); + for (DataEdge dataEdge : dataPort.getEdges()) { + edge = new DirectedEdge(); + edge.setFromPort(outPort); + outPort.addEdge(edge); + inPort = getInPort(dataEdge.getToPort()); + edge.setToPort(inPort); + inPort.addEdge(edge); + portContainers.add(new PortContainer(dataEdge.getToPort(), inPort)); + } + } + return portContainers; + } + + private InPort getInPort(DataPort toPort) { + return new InputPortIml(toPort.getID()); + } + + private InputDataObjectType getInputDataObject(DataPort dataPort) { + InputDataObjectType inputDataObject = new InputDataObjectType(); + inputDataObject.setName(dataPort.getName()); + if (dataPort instanceof WSPort) { + WSPort port = (WSPort) dataPort; + inputDataObject.setInputOrder(port.getComponentPort().getInputOrder()); + inputDataObject.setApplicationArgument(port.getComponentPort().getApplicationArgument() == null ? + "" : port.getComponentPort().getApplicationArgument()); + inputDataObject.setType(dataPort.getType()); + } + return inputDataObject; + } + + private OutputDataObjectType getOutputDataObject(InputDataObjectType inputObject) { + OutputDataObjectType outputDataObjectType = new OutputDataObjectType(); + outputDataObjectType.setApplicationArgument(inputObject.getApplicationArgument()); + outputDataObjectType.setName(inputObject.getName()); + outputDataObjectType.setType(inputObject.getType()); + outputDataObjectType.setValue(inputObject.getValue()); + return outputDataObjectType; + } + + private WorkflowInputNode getWorkflowInputNode(Node inputNode) { + // FIXME: create a new workflow input node implementation with input node data. + return null; + } + + private Workflow getWorkflowFromExperiment() throws RegistryException, AppCatalogException, GraphException, ComponentException { + Registry registry = RegistryFactory.getDefaultRegistry(); + experiment = (Experiment)registry.get(RegistryModelType.EXPERIMENT, experimentId); + WorkflowCatalog workflowCatalog = getWorkflowCatalog(); + return new Workflow(workflowCatalog.getWorkflow(experiment.getApplicationId()).getGraph()); + } + + private WorkflowCatalog getWorkflowCatalog() throws AppCatalogException { + return AppCatalogFactory.getAppCatalog().getWorkflowCatalog(); + } + + private ArrayList<Node> getInputNodes(Workflow wf) { + ArrayList<Node> list = new ArrayList<Node>(); + List<NodeImpl> nodes = wf.getGraph().getNodes(); + for (Node node : nodes) { + String name = node.getComponent().getName(); + if (InputComponent.NAME.equals(name) || ConstantComponent.NAME.equals(name) || S3InputComponent.NAME.equals(name)) { + list.add(node); + } + } + return list; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/PortContainer.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/PortContainer.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/PortContainer.java new file mode 100644 index 0000000..292bd1f --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/PortContainer.java @@ -0,0 +1,32 @@ +package org.apache.ariavata.simple.workflow.engine.parser; + +import org.apache.airavata.workflow.model.graph.DataPort; +import org.apache.ariavata.simple.workflow.engine.dag.port.InPort; + + +public class PortContainer { + private DataPort dataPort; + private InPort inPort; + + + public PortContainer(DataPort dataPort, InPort inPort) { + this.dataPort = dataPort; + this.inPort = inPort; + } + + public DataPort getDataPort() { + return dataPort; + } + + public void setDataPort(DataPort dataPort) { + this.dataPort = dataPort; + } + + public InPort getInPort() { + return inPort; + } + + public void setInPort(InPort inPort) { + this.inPort = inPort; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/e3ce93e1/modules/simple-workflow/src/test/java/org/apache/ariavata/simple/workflow/engine/WorkflowDAGTest.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/test/java/org/apache/ariavata/simple/workflow/engine/WorkflowDAGTest.java b/modules/simple-workflow/src/test/java/org/apache/ariavata/simple/workflow/engine/WorkflowDAGTest.java new file mode 100644 index 0000000..4e3a120 --- /dev/null +++ b/modules/simple-workflow/src/test/java/org/apache/ariavata/simple/workflow/engine/WorkflowDAGTest.java @@ -0,0 +1,25 @@ +package org.apache.ariavata.simple.workflow.engine; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class WorkflowDAGTest { + + @Before + public void setUp() throws Exception { + + } + + @After + public void tearDown() throws Exception { + + } + + @Test + public void testWorkflowDAG() throws Exception { + + } +} \ No newline at end of file
