http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/AiravataWorkflowParser.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/AiravataWorkflowParser.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/AiravataWorkflowParser.java new file mode 100644 index 0000000..1e1a5c5 --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/AiravataWorkflowParser.java @@ -0,0 +1,291 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core.parser; + +import org.airavata.appcatalog.cpi.AppCatalogException; +import org.airavata.appcatalog.cpi.WorkflowCatalog; +import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.model.workspace.experiment.Experiment; +import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; +import org.apache.airavata.registry.cpi.Registry; +import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.airavata.registry.cpi.RegistryModelType; +import org.apache.airavata.workflow.core.WorkflowParser; +import org.apache.airavata.workflow.core.dag.edge.DirectedEdge; +import org.apache.airavata.workflow.core.dag.edge.Edge; +import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode; +import org.apache.airavata.workflow.core.dag.nodes.ApplicationNodeImpl; +import org.apache.airavata.workflow.core.dag.nodes.WorkflowInputNode; +import org.apache.airavata.workflow.core.dag.nodes.WorkflowInputNodeImpl; +import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode; +import org.apache.airavata.workflow.core.dag.nodes.WorkflowOutputNode; +import org.apache.airavata.workflow.core.dag.nodes.WorkflowOutputNodeImpl; +import org.apache.airavata.workflow.core.dag.port.InPort; +import org.apache.airavata.workflow.core.dag.port.InputPortIml; +import org.apache.airavata.workflow.core.dag.port.OutPort; +import org.apache.airavata.workflow.core.dag.port.OutPortImpl; +import org.apache.airavata.workflow.model.component.ComponentException; +import org.apache.airavata.workflow.model.component.system.ConstantComponent; +import org.apache.airavata.workflow.model.component.system.InputComponent; +import org.apache.airavata.workflow.model.component.system.S3InputComponent; +import org.apache.airavata.workflow.model.graph.DataEdge; +import org.apache.airavata.workflow.model.graph.DataPort; +import org.apache.airavata.workflow.model.graph.GraphException; +import org.apache.airavata.workflow.model.graph.Node; +import org.apache.airavata.workflow.model.graph.impl.NodeImpl; +import org.apache.airavata.workflow.model.graph.system.OutputNode; +import org.apache.airavata.workflow.model.graph.system.SystemDataPort; +import org.apache.airavata.workflow.model.graph.ws.WSNode; +import org.apache.airavata.workflow.model.graph.ws.WSPort; +import org.apache.airavata.workflow.model.wf.Workflow; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class AiravataWorkflowParser implements WorkflowParser { + + private String credentialToken ; + + private Experiment experiment; + private Map<String, WorkflowNode> wfNodes = new HashMap<String, WorkflowNode>(); + + + public AiravataWorkflowParser(String experimentId, String credentialToken) throws RegistryException { + this.experiment = getExperiment(experimentId); + this.credentialToken = credentialToken; + } + + public AiravataWorkflowParser(Experiment experiment, String credentialToken) { + this.credentialToken = credentialToken; + this.experiment = experiment; + } + + @Override + public List<WorkflowInputNode> parse() throws RegistryException, AppCatalogException, + ComponentException, GraphException { + return parseWorkflow(getWorkflowFromExperiment(experiment)); + } + + public List<WorkflowInputNode> parseWorkflow(Workflow workflow) { + List<Node> gNodes = getInputNodes(workflow); + List<WorkflowInputNode> wfInputNodes = new ArrayList<WorkflowInputNode>(); + List<PortContainer> portContainers = new ArrayList<PortContainer>(); + List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs(); + Map<String,InputDataObjectType> inputDataMap=new HashMap<String, InputDataObjectType>(); + WorkflowInputNode wfInputNode = null; + for (InputDataObjectType dataObjectType : experimentInputs) { + inputDataMap.put(dataObjectType.getName(), dataObjectType); + } + for (Node gNode : gNodes) { + wfInputNode = new WorkflowInputNodeImpl(gNode.getID(), gNode.getName()); + wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getId())); + if (wfInputNode.getInputObject() == null) { + throw new RuntimeException("Workflow Input object is not set, workflow node id: " + wfInputNode.getId()); + } + portContainers.addAll(processOutPorts(gNode, wfInputNode)); + wfInputNodes.add(wfInputNode); + } + + // while port container is not empty iterate graph and build the workflow DAG. + buildModel(portContainers); + + return wfInputNodes; + } + + private void buildModel(List<PortContainer> portContainerList) { + // end condition of recursive call. + if (portContainerList == null || portContainerList.isEmpty()) { + return ; + } + DataPort dataPort = null; + InPort inPort = null; + ApplicationNode wfApplicationNode = null; + WorkflowOutputNode wfOutputNode = null; + List<PortContainer> nextPortContainerList = new ArrayList<PortContainer>(); + for (PortContainer portContainer : portContainerList) { + dataPort = portContainer.getDataPort(); + inPort = portContainer.getInPort(); + Node node = dataPort.getNode(); + if (node instanceof WSNode) { + WSNode wsNode = (WSNode) node; + WorkflowNode wfNode = wfNodes.get(wsNode.getID()); + if (wfNode == null) { + wfApplicationNode = createApplicationNode(wsNode); + wfNodes.put(wfApplicationNode.getId(), wfApplicationNode); + nextPortContainerList.addAll(processOutPorts(wsNode, wfApplicationNode)); + } else if (wfNode instanceof ApplicationNode) { + wfApplicationNode = (ApplicationNode) wfNode; + } else { + throw new IllegalArgumentException("Only support for ApplicationNode implementation, but found other type for node implementation"); + } + inPort.setNode(wfApplicationNode); + wfApplicationNode.addInPort(inPort); + + }else if (node instanceof OutputNode) { + OutputNode oNode = (OutputNode) node; + wfOutputNode = createWorkflowOutputNode(oNode); + wfOutputNode.setInPort(inPort); + inPort.setNode(wfOutputNode); + wfNodes.put(wfOutputNode.getId(), wfOutputNode); + } + } + buildModel(nextPortContainerList); + + } + + private WorkflowOutputNode createWorkflowOutputNode(OutputNode oNode) { + WorkflowOutputNodeImpl workflowOutputNode = new WorkflowOutputNodeImpl(oNode.getID(), oNode.getName()); + OutputDataObjectType outputDataObjectType = new OutputDataObjectType(); + outputDataObjectType.setType(oNode.getParameterType()); + workflowOutputNode.setOutputObject(outputDataObjectType); + return workflowOutputNode; + } + + private ApplicationNode createApplicationNode(WSNode wsNode) { + ApplicationNode applicationNode = new ApplicationNodeImpl(wsNode.getID(), + wsNode.getComponent().getApplication().getName(), + wsNode.getComponent().getApplication().getApplicationId()); + return applicationNode; + } + + private List<PortContainer> processOutPorts(Node node, WorkflowNode wfNode) { + OutPort outPort ; + Edge edge; + InPort inPort = null; + List<PortContainer> portContainers = new ArrayList<PortContainer>(); + for (DataPort dataPort : node.getOutputPorts()) { + outPort = createOutPort(dataPort); + for (DataEdge dataEdge : dataPort.getEdges()) { + edge = new DirectedEdge(); + edge.setFromPort(outPort); + outPort.addEdge(edge); + inPort = createInPort(dataEdge.getToPort()); + edge.setToPort(inPort); + inPort.addEdge(edge); + portContainers.add(new PortContainer(dataEdge.getToPort(), inPort)); + } + outPort.setNode(wfNode); + if (wfNode instanceof WorkflowInputNode) { + WorkflowInputNode workflowInputNode = (WorkflowInputNode) wfNode; + workflowInputNode.setOutPort(outPort); + } else if (wfNode instanceof ApplicationNode) { + ApplicationNode applicationNode = ((ApplicationNode) wfNode); + applicationNode.addOutPort(outPort); + } + } + return portContainers; + } + + private OutPort createOutPort(DataPort dataPort) { + OutPortImpl outPort = new OutPortImpl(dataPort.getID()); + OutputDataObjectType outputDataObjectType = new OutputDataObjectType(); + if (dataPort instanceof WSPort) { + WSPort wsPort = (WSPort) dataPort; + outputDataObjectType.setName(wsPort.getComponentPort().getName()); + outputDataObjectType.setType(wsPort.getType()); + }else if (dataPort instanceof SystemDataPort) { + SystemDataPort sysPort = (SystemDataPort) dataPort; + outputDataObjectType.setName(sysPort.getFromNode().getName()); + outputDataObjectType.setType(sysPort.getType()); + } + + outPort.setOutputObject(outputDataObjectType); + return outPort; + } + + private InPort createInPort(DataPort toPort) { + InPort inPort = new InputPortIml(toPort.getID()); + InputDataObjectType inputDataObjectType = new InputDataObjectType(); + if (toPort instanceof WSPort) { + WSPort wsPort = (WSPort) toPort; + inputDataObjectType.setName(wsPort.getName()); + inputDataObjectType.setType(wsPort.getType()); + inputDataObjectType.setApplicationArgument(wsPort.getComponentPort().getApplicationArgument()); + inputDataObjectType.setIsRequired(!wsPort.getComponentPort().isOptional()); + inputDataObjectType.setInputOrder(wsPort.getComponentPort().getInputOrder()); + + inPort.setDefaultValue(wsPort.getComponentPort().getDefaultValue()); + }else if (toPort instanceof SystemDataPort) { + SystemDataPort sysPort = (SystemDataPort) toPort; + inputDataObjectType.setName(sysPort.getName()); + inputDataObjectType.setType(sysPort.getType()); + } + inPort.setInputObject(inputDataObjectType); + return inPort; + } + + private InputDataObjectType getInputDataObject(DataPort dataPort) { + InputDataObjectType inputDataObject = new InputDataObjectType(); + inputDataObject.setName(dataPort.getName()); + if (dataPort instanceof WSPort) { + WSPort port = (WSPort) dataPort; + inputDataObject.setInputOrder(port.getComponentPort().getInputOrder()); + inputDataObject.setApplicationArgument(port.getComponentPort().getApplicationArgument() == null ? + "" : port.getComponentPort().getApplicationArgument()); + inputDataObject.setType(dataPort.getType()); + } + return inputDataObject; + } + + private OutputDataObjectType getOutputDataObject(InputDataObjectType inputObject) { + OutputDataObjectType outputDataObjectType = new OutputDataObjectType(); + outputDataObjectType.setApplicationArgument(inputObject.getApplicationArgument()); + outputDataObjectType.setName(inputObject.getName()); + outputDataObjectType.setType(inputObject.getType()); + outputDataObjectType.setValue(inputObject.getValue()); + return outputDataObjectType; + } + + private Experiment getExperiment(String experimentId) throws RegistryException { + Registry registry = RegistryFactory.getDefaultRegistry(); + return (Experiment)registry.get(RegistryModelType.EXPERIMENT, experimentId); + } + + private Workflow getWorkflowFromExperiment(Experiment experiment) throws RegistryException, AppCatalogException, GraphException, ComponentException { + WorkflowCatalog workflowCatalog = getWorkflowCatalog(); + return new Workflow(workflowCatalog.getWorkflow(experiment.getApplicationId()).getGraph()); + } + + private WorkflowCatalog getWorkflowCatalog() throws AppCatalogException { + return AppCatalogFactory.getAppCatalog().getWorkflowCatalog(); + } + + private ArrayList<Node> getInputNodes(Workflow wf) { + ArrayList<Node> list = new ArrayList<Node>(); + List<NodeImpl> nodes = wf.getGraph().getNodes(); + for (Node node : nodes) { + String name = node.getComponent().getName(); + if (InputComponent.NAME.equals(name) || ConstantComponent.NAME.equals(name) || S3InputComponent.NAME.equals(name)) { + list.add(node); + } + } + return list; + } + + public Map<String, WorkflowNode> getWfNodes() { + return wfNodes; + } +}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/PortContainer.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/PortContainer.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/PortContainer.java new file mode 100644 index 0000000..536199c --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/PortContainer.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core.parser; + +import org.apache.airavata.workflow.core.dag.port.InPort; +import org.apache.airavata.workflow.model.graph.DataPort; + + +public class PortContainer { + private DataPort dataPort; + private InPort inPort; + + + public PortContainer(DataPort dataPort, InPort inPort) { + this.dataPort = dataPort; + this.inPort = inPort; + } + + public DataPort getDataPort() { + return dataPort; + } + + public void setDataPort(DataPort dataPort) { + this.dataPort = dataPort; + } + + public InPort getInPort() { + return inPort; + } + + public void setInPort(InPort inPort) { + this.inPort = inPort; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/WorkflowDAGTest.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/WorkflowDAGTest.java b/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/WorkflowDAGTest.java new file mode 100644 index 0000000..09c1416 --- /dev/null +++ b/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/WorkflowDAGTest.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class WorkflowDAGTest { + + @Before + public void setUp() throws Exception { + + } + + @After + public void tearDown() throws Exception { + + } + + @Test + public void testWorkflowDAG() throws Exception { + + + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/AiravataWorkflowParserTest.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/AiravataWorkflowParserTest.java b/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/AiravataWorkflowParserTest.java new file mode 100644 index 0000000..f7bffa2 --- /dev/null +++ b/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/AiravataWorkflowParserTest.java @@ -0,0 +1,119 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core.parser; + +import org.apache.airavata.model.appcatalog.appinterface.DataType; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.model.workspace.experiment.Experiment; +import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode; +import org.apache.airavata.workflow.core.dag.nodes.WorkflowInputNode; +import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode; +import org.apache.airavata.workflow.core.dag.nodes.WorkflowOutputNode; +import org.apache.airavata.workflow.model.wf.Workflow; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class AiravataWorkflowParserTest { + + @Before + public void setUp() throws Exception { + + } + + @After + public void tearDown() throws Exception { + + } + + @Test + public void testWorkflowParse() throws Exception { + Assert.assertNotNull("Test file (ComplexMathWorkflow.awf) is missing", getClass().getResource("/ComplexMathWorkflow.awf")); + InputStreamReader isr = new InputStreamReader(this.getClass().getResourceAsStream("/ComplexMathWorkflow.awf")); + BufferedReader br = new BufferedReader(isr); + StringBuffer sb = new StringBuffer(); + String nextLine = br.readLine(); + while (nextLine != null) { + sb.append(nextLine); + nextLine = br.readLine(); + } + Workflow workflow = new Workflow(sb.toString()); + Experiment experiment = new Experiment(); + InputDataObjectType x = new InputDataObjectType(); + x.setValue("6"); + x.setType(DataType.STRING); + x.setName("x"); + + InputDataObjectType y = new InputDataObjectType(); + y.setValue("8"); + y.setType(DataType.STRING); + y.setName("y"); + + InputDataObjectType z = new InputDataObjectType(); + z.setValue("10"); + z.setType(DataType.STRING); + z.setName("y_2"); + + List<InputDataObjectType> inputs = new ArrayList<InputDataObjectType>(); + inputs.add(x); + inputs.add(y); + inputs.add(z); + experiment.setExperimentInputs(inputs); + // create parser + AiravataWorkflowParser parser = new AiravataWorkflowParser(experiment, "testCredentialId"); + List<WorkflowInputNode> workflowInputNodes = parser.parseWorkflow(workflow); + Assert.assertNotNull(workflowInputNodes); + Assert.assertEquals(3, workflowInputNodes.size()); + for (WorkflowInputNode workflowInputNode : workflowInputNodes) { + Assert.assertNotNull(workflowInputNode.getOutPort()); + Assert.assertNotNull(workflowInputNode.getInputObject()); + } + + Map<String, WorkflowNode> wfNodes = parser.getWfNodes(); + for (String wfId : wfNodes.keySet()) { + WorkflowNode wfNode = wfNodes.get(wfId); + if (wfNode instanceof ApplicationNode) { + ApplicationNode node = (ApplicationNode) wfNode; + Assert.assertEquals(2, node.getInputPorts().size()); + Assert.assertNotNull(node.getInputPorts().get(0).getInputObject()); + Assert.assertNotNull(node.getInputPorts().get(1).getInputObject()); + Assert.assertNotNull(node.getInputPorts().get(0).getEdge()); + Assert.assertNotNull(node.getInputPorts().get(1).getEdge()); + + Assert.assertEquals(1, node.getOutputPorts().size()); + Assert.assertEquals(1, node.getOutputPorts().get(0).getOutEdges().size()); + Assert.assertNotNull(node.getOutputPorts().get(0).getOutEdges().get(0)); + } else if (wfNode instanceof WorkflowOutputNode) { + WorkflowOutputNode workflowOutputNode = (WorkflowOutputNode) wfNode; + Assert.assertNotNull(workflowOutputNode.getInPort()); + } + } + + } +} \ No newline at end of file
