Added TestWorkflow and fixed set of compilatin issues
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d1bb3827 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d1bb3827 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d1bb3827 Branch: refs/heads/master Commit: d1bb38275f8caa2204ecbe1611b0fbe9c6e322b5 Parents: 14a566a Author: Shameera Rathnayaka <[email protected]> Authored: Wed Feb 3 16:42:21 2016 -0500 Committer: Shameera Rathnayaka <[email protected]> Committed: Wed Feb 3 16:42:21 2016 -0500 ---------------------------------------------------------------------- .../core/SimpleWorkflowInterpreter.java | 35 +++--- .../airavata/workflow/core/WorkflowFactory.java | 5 - .../workflow/core/dag/edge/DirectedEdge.java | 5 + .../airavata/workflow/core/dag/edge/Edge.java | 2 + .../workflow/core/dag/port/OutPort.java | 2 +- .../workflow/core/dag/port/OutPortImpl.java | 2 +- .../core/parser/AiravataWorkflowBuilder.java | 117 ------------------- .../core/parser/JsonWorkflowParser.java | 6 +- .../core/parser/JsonWorkflowParserTest.java | 4 +- .../src/test/resources/TestWorkflow.json | 85 ++++++++++++++ 10 files changed, 123 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java index 01ad6bb..7f8a8a5 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java @@ -67,6 +67,7 @@ class SimpleWorkflowInterpreter{ private String gatewayName; + private String workflowString; private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<String, WorkflowNode>(); private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>(); private Map<String, WorkflowContext> processingQueue = new ConcurrentHashMap<String, WorkflowContext>(); @@ -97,10 +98,12 @@ class SimpleWorkflowInterpreter{ * @throws Exception */ void launchWorkflow() throws Exception { - WorkflowBuilder workflowBuilder = WorkflowFactory.getWorkflowBuilder(experiment.getExperimentId(), credentialToken, null); - +// WorkflowBuilder workflowBuilder = WorkflowFactory.getWorkflowBuilder(experiment.getExperimentId(), credentialToken, null); + workflowString = getWorkflow(); + WorkflowParser workflowParser = WorkflowFactory.getWorkflowParser(workflowString); log.debug("Initialized workflow parser"); - setInputNodes(workflowBuilder.build()); + workflowParser.parse(); + setInputNodes(workflowParser.getInputNodes()); log.debug("Parsed the workflow and got the workflow input nodes"); // process workflow input nodes processWorkflowInputNodes(getInputNodes()); @@ -117,6 +120,12 @@ class SimpleWorkflowInterpreter{ processReadyList(); } + private String getWorkflow() throws AppCatalogException { + WorkflowCatalog workflowCatalog = RegistryFactory.getAppCatalog().getWorkflowCatalog(); + //FIXME: parse workflowTemplateId or experimentId + workflowCatalog.getWorkflow(""); + } + // try to remove synchronization tag /** * Package-Private method. @@ -129,9 +138,9 @@ class SimpleWorkflowInterpreter{ } for (WorkflowNode readyNode : readyList.values()) { if (readyNode instanceof OutputNode) { - OutputNode wfOutputNode = (OutputNode) readyNode; - wfOutputNode.getOutputObject().setValue(wfOutputNode.getInPort().getInputObject().getValue()); - addToCompleteOutputNodeList(wfOutputNode); + OutputNode outputNode = (OutputNode) readyNode; + outputNode.getOutputObject().setValue(outputNode.getInPort().getInputObject().getValue()); + addToCompleteOutputNodeList(outputNode); continue; } WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode); @@ -206,13 +215,13 @@ class SimpleWorkflowInterpreter{ } - private void processWorkflowInputNodes(List<InputNode> wfInputNodes) { + private void processWorkflowInputNodes(List<InputNode> inputNodes) { Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>(); - for (InputNode wfInputNode : wfInputNodes) { - if (wfInputNode.isReady()) { - log.debug("Workflow node : " + wfInputNode.getId() + " is ready to execute"); - for (Edge edge : wfInputNode.getOutPort().getOutEdges()) { - edge.getToPort().getInputObject().setValue(wfInputNode.getInputObject().getValue()); + for (InputNode inputNode : inputNodes) { + if (inputNode.isReady()) { + log.debug("Workflow node : " + inputNode.getId() + " is ready to execute"); + for (Edge edge : inputNode.getOutPort().getEdges()) { + edge.getToPort().getInputObject().setValue(inputNode.getInputObject().getValue()); if (edge.getToPort().getNode().isReady()) { addToReadyQueue(edge.getToPort().getNode()); log.debug("Added workflow node : " + edge.getToPort().getNode().getId() + " to the readyQueue"); @@ -315,7 +324,7 @@ class SimpleWorkflowInterpreter{ break; } } - for (Edge edge : outPort.getOutEdges()) { + for (Edge edge : outPort.getEdges()) { edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue()); if (edge.getToPort().getNode().isReady()) { addToReadyQueue(edge.getToPort().getNode()); http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java index f232efa..e06fab5 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java @@ -37,11 +37,6 @@ public class WorkflowFactory { private static final Logger log = LoggerFactory.getLogger(WorkflowFactory.class); - - public static WorkflowBuilder getWorkflowBuilder(String experimentId, String credentialToken, String workflowString) throws Exception { - return new AiravataWorkflowBuilder(experimentId, credentialToken, getWorkflowParser(workflowString)); - } - public static WorkflowParser getWorkflowParser(String workflowString) throws Exception { WorkflowParser workflowParser = null; try { http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java index b1d79b1..3ad7afa 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java @@ -33,6 +33,11 @@ public class DirectedEdge implements Edge { private EdgeModel edgeModel; @Override + public String getId() { + return getEdgeModel().getEdgeId(); + } + + @Override public void setEdgeModel(EdgeModel edgeModel) { this.edgeModel = edgeModel; } http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java index d1c340e..2ad098e 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java @@ -32,6 +32,8 @@ import org.apache.airavata.workflow.core.dag.port.OutPort; public interface Edge { + public String getId(); + public void setEdgeModel(EdgeModel edgeModel); public EdgeModel getEdgeModel(); http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java index 7ae3220..d12666e 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java @@ -32,7 +32,7 @@ public interface OutPort extends Port { public OutputDataObjectType getOutputObject(); - public List<Edge> getOutEdges(); + public List<Edge> getEdges(); public void addEdge(Edge edge); http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java index 4d90308..c261279 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java @@ -52,7 +52,7 @@ public class OutPortImpl implements OutPort { } @Override - public List<Edge> getOutEdges() { + public List<Edge> getEdges() { return this.outEdges; } http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/AiravataWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/AiravataWorkflowBuilder.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/AiravataWorkflowBuilder.java deleted file mode 100644 index e50b245..0000000 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/AiravataWorkflowBuilder.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.workflow.core.parser; - -import org.apache.airavata.model.application.io.InputDataObjectType; -import org.apache.airavata.model.application.io.OutputDataObjectType; -import org.apache.airavata.model.experiment.ExperimentModel; -import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; -import org.apache.airavata.registry.cpi.*; -import org.apache.airavata.workflow.core.WorkflowBuilder; -import org.apache.airavata.workflow.core.WorkflowParser; -import org.apache.airavata.workflow.core.dag.edge.DirectedEdge; -import org.apache.airavata.workflow.core.dag.edge.Edge; -import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode; -import org.apache.airavata.workflow.core.dag.nodes.ApplicationNodeImpl; -import org.apache.airavata.workflow.core.dag.nodes.InputNode; -import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode; -import org.apache.airavata.workflow.core.dag.nodes.OutputNode; -import org.apache.airavata.workflow.core.dag.nodes.OutputNodeImpl; -import org.apache.airavata.workflow.core.dag.port.*; - -import java.util.*; - -public class AiravataWorkflowBuilder implements WorkflowBuilder { - - private String credentialToken ; - private WorkflowParser workflowParser; - private ExperimentModel experiment; - - - public AiravataWorkflowBuilder(String experimentId, String credentialToken, WorkflowParser workflowParser) throws RegistryException { - this.experiment = getExperiment(experimentId); - this.credentialToken = credentialToken; - this.workflowParser = workflowParser; - } - - public AiravataWorkflowBuilder(ExperimentModel experiment, String credentialToken , WorkflowParser workflowParser) { - this.credentialToken = credentialToken; - this.experiment = experiment; - this.workflowParser = workflowParser; - } - - @Override - public List<InputNode> build() throws Exception { - return parseWorkflow(getWorkflowFromExperiment(experiment)); - } - - @Override - public List<InputNode> build(String workflow) throws Exception { - return parseWorkflow(workflow); - } - - public List<InputNode> parseWorkflow(String workflow) throws Exception { - - List<InputNode> inputNodes = workflowParser.getInputNodes(); - List<ApplicationNode> applicationNodes = workflowParser.getApplicationNodes(); - List<Port> ports = workflowParser.getPorts(); - List<Edge> edges = workflowParser.getEdges(); - List<OutputNode> outputNodes = workflowParser.getOutputNodes(); - - // travel breath first and build relation between each workflow component - Queue<WorkflowNode> queue = new LinkedList<>(); - List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs(); - Map<String,InputDataObjectType> inputDataMap=new HashMap<String, InputDataObjectType>(); - for (InputDataObjectType dataObjectType : experimentInputs) { - inputDataMap.put(dataObjectType.getName(), dataObjectType); - } - - return inputNodes; - } - - - private OutputDataObjectType getOutputDataObject(InputDataObjectType inputObject) { - OutputDataObjectType outputDataObjectType = new OutputDataObjectType(); - outputDataObjectType.setApplicationArgument(inputObject.getApplicationArgument()); - outputDataObjectType.setName(inputObject.getName()); - outputDataObjectType.setType(inputObject.getType()); - outputDataObjectType.setValue(inputObject.getValue()); - return outputDataObjectType; - } - - private ExperimentModel getExperiment(String experimentId) throws RegistryException { - Registry registry = RegistryFactory.getRegistry(); - return (ExperimentModel)registry.getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId); - } - - private String getWorkflowFromExperiment(ExperimentModel experiment) throws RegistryException, AppCatalogException { - WorkflowCatalog workflowCatalog = getWorkflowCatalog(); - - // FIXME: return workflow string - return null; - } - - private WorkflowCatalog getWorkflowCatalog() throws AppCatalogException { - return RegistryFactory.getAppCatalog().getWorkflowCatalog(); - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java index 59576b3..ede69e3 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java @@ -16,12 +16,16 @@ import java.util.List; public class JsonWorkflowParser implements WorkflowParser{ private final String workflow; + private List<InputNode> inputs; + private List<OutputNode> outputs; + private List<ApplicationNode> applications; + private List<Port> ports; + private List<Edge> edges; public JsonWorkflowParser(String jsonWorkflowString) { workflow = jsonWorkflowString; } - @Override public void parse() throws Exception { // TODO parse json string and construct components http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java b/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java index 95a2579..712944d 100644 --- a/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java +++ b/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java @@ -109,8 +109,8 @@ public class JsonWorkflowParserTest { Assert.assertNotNull(node.getInputPorts().get(1).getEdge()); Assert.assertEquals(1, node.getOutputPorts().size()); - Assert.assertEquals(1, node.getOutputPorts().get(0).getOutEdges().size()); - Assert.assertNotNull(node.getOutputPorts().get(0).getOutEdges().get(0)); + Assert.assertEquals(1, node.getOutputPorts().get(0).getEdges().size()); + Assert.assertNotNull(node.getOutputPorts().get(0).getEdges().get(0)); } else if (wfNode instanceof OutputNode) { OutputNode outputNode = (OutputNode) wfNode; Assert.assertNotNull(outputNode.getInPort()); http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/test/resources/TestWorkflow.json ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/test/resources/TestWorkflow.json b/modules/workflow/workflow-core/src/test/resources/TestWorkflow.json new file mode 100644 index 0000000..5b64172 --- /dev/null +++ b/modules/workflow/workflow-core/src/test/resources/TestWorkflow.json @@ -0,0 +1,85 @@ +{ + "workflow" : { + "name" : "name", + "id" : "default_id", + "description" : "default description", + "version" : "version", + "applications" : [ + { "applicationId" : "appId_1", + "name" : "App Name", + "description" : "My app description", + "appType" : "MPI", + "inputs" : [ + { "name" : "appInputName_1", + "id" : "appInputNode_Id_1", + "dataType" : "STRING", + "defaultValue" : "defaultValue", + "description" : "App Input Description" }, + { "name" : "appInputName_2", + "id" : "appInputNode_Id_2", + "dataType" : "STRING", + "defaultValue" : "defaultValue", + "description" : "App Input Description" }], + "outputs" : [ + { "name" : "appOutputName_1", + "id" : "appOutputNode_Id_1", + "dataType" : "STRING", + "defaultValue" : "defaultValue", + "description" : "App Output Description" }, + { "name" : "appOutputName_2", + "id" : "appOutputNode_Id_2", + "dataType" : "STRING", + "defaultValue" : "defaultValue" }], + "position" : { "x" : 124 , "y" : 643 }, + "nodeId" : "applicationNodeId", + "parallelExecution" : "true", + "properties" : null } + ], + "workflowInputs" : [ + { "name" : "inputName_1", + "id" : "inputNode_Id_1", + "dataType" : "STRING", + "defaultValue" : "defaultValue", + "description" : "Input Description", + "position" : { "x" : 23 , "y" : 43 }, + "nodeId" : "defaultNodeId_1" }, + { "name" : "inputName_2", + "id" : "inputNode_Id_2", + "dataType" : "STRING", + "defaultValue" : "defaultValue", + "description" : "Input Description", + "position" : { "x" : 23 , "y" : 103 }, + "nodeId" : "defaultNodeId_2" } + ], + "workflowOutputs" : [ + { "name" : "outputName_1", + "id" : "outputNode_Id_1", + "dataType" : "STRING", + "defaultValue" : "defaultValue", + "description" : "Output Description", + "position" : { "x" : 423 , "y" : 43 }, + "nodeId" : "defaultOutputNodeId_1" }, + { "name" : "outputName_2", + "id" : "outputNode_Id_2", + "dataType" : "STRING", + "defaultValue" : "defaultValue", + "description" : "Output Description", + "position" : { "x" : 423 , "y" : 103 }, + "nodeId" : "defaultOutputNodeId_2" } + ], + "links" : [ + { "description" : "link desc", + "from" : { "nodeId" : "" , "ouputId" : "" }, + "to" : { "nodeId" : "" , "outputId" : "" }}, + { "description" : "link desc", + "from" : { "nodeId" : "" , "ouputId" : "" }, + "to" : { "nodeId" : "" , "outputId" : "" }}, + { "description" : "link desc", + "from" : { "nodeId" : "" , "ouputId" : "" }, + "to" : { "nodeId" : "" , "outputId" : "" }}, + { "description" : "link desc", + "from" : { "nodeId" : "" , "ouputId" : "" }, + "to" : { "nodeId" : "" , "outputId" : "" }} + ] + } +}
