http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java index 6dcb8bd..edfa306 100644 --- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java @@ -21,10 +21,16 @@ package org.apache.airavata.simple.workflow.engine; -import com.google.common.eventbus.EventBus; -import com.google.common.eventbus.Subscribe; -import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessageHandler; +import org.apache.airavata.messaging.core.MessagingConstants; +import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher; +import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.airavata.model.messaging.event.ProcessSubmitEvent; import org.apache.airavata.model.messaging.event.TaskIdentifier; import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent; import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; @@ -45,11 +51,11 @@ import org.apache.airavata.simple.workflow.engine.dag.edge.Edge; import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode; import org.apache.airavata.simple.workflow.engine.dag.nodes.NodeState; import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode; -import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode; -import org.apache.airavata.simple.workflow.engine.parser.AiravataDefaultParser; import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode; import org.apache.airavata.simple.workflow.engine.dag.port.InPort; import org.apache.airavata.simple.workflow.engine.dag.port.OutPort; +import org.apache.airavata.simple.workflow.engine.parser.AiravataWorkflowParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,32 +70,36 @@ import java.util.concurrent.ConcurrentHashMap; public class SimpleWorkflowInterpreter implements Runnable{ private static final Logger log = LoggerFactory.getLogger(SimpleWorkflowInterpreter.class); - private List<WorkflowInputNode> workflowInputNodes; private Experiment experiment; private String credentialToken; + private String gatewayName; + private Map<String, WorkflowNode> readList = new ConcurrentHashMap<String, WorkflowNode>(); private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>(); - private Map<String, ProcessPack> processingQueue = new ConcurrentHashMap<String, ProcessPack>(); - private Map<String, ProcessPack> completeList = new HashMap<String, ProcessPack>(); + private Map<String, ProcessContext> processingQueue = new ConcurrentHashMap<String, ProcessContext>(); + private Map<String, ProcessContext> completeList = new HashMap<String, ProcessContext>(); private Registry registry; - private EventBus eventBus = new EventBus(); private List<WorkflowOutputNode> completeWorkflowOutputs = new ArrayList<WorkflowOutputNode>(); + private RabbitMQProcessPublisher publisher; + private RabbitMQStatusConsumer statusConsumer; + private String consumerId; - public SimpleWorkflowInterpreter(String experimentId, String credentialToken) throws RegistryException { + public SimpleWorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessPublisher publisher) throws RegistryException { + this.gatewayName = gatewayName; setExperiment(experimentId); this.credentialToken = credentialToken; + this.publisher = publisher; } - public SimpleWorkflowInterpreter(Experiment experiment, String credentialStoreToken) { - // read the workflow file and build the topology to a DAG. Then execute that dag - // get workflowInputNode list and start processing - // next() will return ready task and block the thread if no task in ready state. + public SimpleWorkflowInterpreter(Experiment experiment, String credentialStoreToken, String gatewayName, RabbitMQProcessPublisher publisher) { + this.gatewayName = gatewayName; this.experiment = experiment; this.credentialToken = credentialStoreToken; + this.publisher = publisher; } @@ -97,11 +107,15 @@ public class SimpleWorkflowInterpreter implements Runnable{ // process workflow input nodes // WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance(); // WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken); - WorkflowParser workflowParser = new AiravataDefaultParser(experiment, credentialToken); + WorkflowParser workflowParser = new AiravataWorkflowParser(experiment, credentialToken); log.debug("Initialized workflow parser"); setWorkflowInputNodes(workflowParser.parse()); log.debug("Parsed the workflow and got the workflow input nodes"); processWorkflowInputNodes(getWorkflowInputNodes()); + + + statusConsumer = new RabbitMQStatusConsumer(); + consumerId = statusConsumer.listen(new TaskMessageHandler()); } // try to remove synchronization tag @@ -116,56 +130,31 @@ public class SimpleWorkflowInterpreter implements Runnable{ } WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode); TaskDetails process = getProcess(workflowNodeDetails); - ProcessPack processPack = new ProcessPack(readyNode, workflowNodeDetails, process); - addToProcessingQueue(processPack); -// publishToProcessQueue(process); - publishToProcessQueue(processPack); + ProcessContext processContext = new ProcessContext(readyNode, workflowNodeDetails, process); + addToProcessingQueue(processContext); + publishToProcessQueue(process); +// publishToProcessQueue(processPack); } catch (RegistryException e) { // FIXME : handle this exception + } catch (AiravataException e) { + log.error("Error while publishing process to the process queue"); } } } - private void publishToProcessQueue(TaskDetails process) { - Thread thread = new Thread(new TempPublisher(process, eventBus)); - thread.start(); - //TODO: publish to process queue. - } + private void publishToProcessQueue(TaskDetails process) throws AiravataException { + ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent(); + processSubmitEvent.setCredentialToken(credentialToken); + processSubmitEvent.setTaskId(process.getTaskID()); + MessageContext messageContext = new MessageContext(processSubmitEvent, MessageType.TASK, process.getTaskID(), null); + messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + publisher.publish(messageContext); - // TODO : remove this test method - private void publishToProcessQueue(ProcessPack process) { - WorkflowNode workflowNode = process.getWorkflowNode(); - if (workflowNode instanceof ApplicationNode) { - ApplicationNode applicationNode = (ApplicationNode) workflowNode; - List<InPort> inputPorts = applicationNode.getInputPorts(); - if (applicationNode.getName().equals("Add")) { - applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf( - Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) + Integer.parseInt(inputPorts.get(1).getInputObject().getValue()))); - } else if (applicationNode.getName().equals("Multiply")) { - applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf( - Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) * Integer.parseInt(inputPorts.get(1).getInputObject().getValue()))); - } else if (applicationNode.getName().equals("Subtract")) { - applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf( - Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) - Integer.parseInt(inputPorts.get(1).getInputObject().getValue()))); - } else { - throw new RuntimeException("Invalid Application name"); - } - for (Edge edge : applicationNode.getOutputPorts().get(0).getOutEdges()) { - WorkflowUtil.copyValues(applicationNode.getOutputPorts().get(0).getOutputObject(), edge.getToPort().getInputObject()); - if (edge.getToPort().getNode().isReady()) { - addToReadyQueue(edge.getToPort().getNode()); - } else { - addToWaitingQueue(edge.getToPort().getNode()); - } - } - } else if (workflowNode instanceof WorkflowOutputNode) { - WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) workflowNode; - throw new RuntimeException("Workflow output node in processing queue"); - } - - processingQueue.remove(process.getTaskDetails().getTaskID()); +// Thread thread = new Thread(new TempPublisher(process, eventBus)); +// thread.start(); + //TODO: publish to process queue. } private TaskDetails getProcess(WorkflowNodeDetails wfNodeDetails) throws RegistryException { @@ -242,12 +231,6 @@ public class SimpleWorkflowInterpreter implements Runnable{ this.workflowInputNodes = workflowInputNodes; } - - private List<WorkflowInputNode> parseWorkflowDescription(){ - return null; - } - - private Registry getRegistry() throws RegistryException { if (registry==null){ registry = RegistryFactory.getDefaultRegistry(); @@ -265,93 +248,6 @@ public class SimpleWorkflowInterpreter implements Runnable{ getRegistry().update(RegistryModelType.WORKFLOW_NODE_STATUS, status, wfNodeDetails.getNodeInstanceId()); } - @Subscribe - public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent){ - String taskId = taskOutputEvent.getTaskIdentity().getTaskId(); - log.debug("Task Output changed event received for workflow node : " + - taskOutputEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId); - ProcessPack processPack = processingQueue.get(taskId); - Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>(); - if (processPack != null) { - WorkflowNode workflowNode = processPack.getWorkflowNode(); - if (workflowNode instanceof ApplicationNode) { - ApplicationNode applicationNode = (ApplicationNode) workflowNode; - // Workflow node can have one to many output ports and each output port can have one to many links - for (OutPort outPort : applicationNode.getOutputPorts()) { - for (OutputDataObjectType outputDataObjectType : taskOutputEvent.getOutput()) { - if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) { - outPort.getOutputObject().setValue(outputDataObjectType.getValue()); - break; - } - } - for (Edge edge : outPort.getOutEdges()) { - WorkflowUtil.copyValues(outPort.getOutputObject(), edge.getToPort().getInputObject()); - if (edge.getToPort().getNode().isReady()) { - addToReadyQueue(edge.getToPort().getNode()); - } - } - } - } - processingQueue.remove(taskId); - log.debug("removed task from processing queue : " + taskId); - } - - } - - @Subscribe - public void taskStatusChanged(TaskStatusChangeEvent taskStatus){ - String taskId = taskStatus.getTaskIdentity().getTaskId(); - ProcessPack processPack = processingQueue.get(taskId); - if (processPack != null) { - WorkflowNodeState wfNodeState = WorkflowNodeState.UNKNOWN; - switch (taskStatus.getState()) { - case WAITING: - break; - case STARTED: - break; - case PRE_PROCESSING: - processPack.getWorkflowNode().setState(NodeState.PRE_PROCESSING); - break; - case INPUT_DATA_STAGING: - processPack.getWorkflowNode().setState(NodeState.PRE_PROCESSING); - break; - case EXECUTING: - processPack.getWorkflowNode().setState(NodeState.EXECUTING); - break; - case OUTPUT_DATA_STAGING: - processPack.getWorkflowNode().setState(NodeState.POST_PROCESSING); - break; - case POST_PROCESSING: - processPack.getWorkflowNode().setState(NodeState.POST_PROCESSING); - break; - case COMPLETED: - processPack.getWorkflowNode().setState(NodeState.EXECUTED); - break; - case FAILED: - processPack.getWorkflowNode().setState(NodeState.FAILED); - break; - case UNKNOWN: - break; - case CONFIGURING_WORKSPACE: - break; - case CANCELED: - case CANCELING: - processPack.getWorkflowNode().setState(NodeState.FAILED); - break; - default: - break; - } - if (wfNodeState != WorkflowNodeState.UNKNOWN) { - try { - updateWorkflowNodeStatus(processPack.getWfNodeDetails(), wfNodeState); - } catch (RegistryException e) { - // TODO: handle this. - } - } - } - - } - /** * Remove the workflow node from waiting queue and add it to the ready queue. * @param workflowNode - Workflow Node @@ -368,16 +264,16 @@ public class SimpleWorkflowInterpreter implements Runnable{ /** * First remove the node from ready list and then add the WfNodeContainer to the process queue. * Note that underline data structure of the process queue is a Map. - * @param processPack - has both workflow and correspond workflowNodeDetails and TaskDetails + * @param processContext - has both workflow and correspond workflowNodeDetails and TaskDetails */ - private synchronized void addToProcessingQueue(ProcessPack processPack) { - readList.remove(processPack.getWorkflowNode().getId()); - processingQueue.put(processPack.getTaskDetails().getTaskID(), processPack); + private synchronized void addToProcessingQueue(ProcessContext processContext) { + readList.remove(processContext.getWorkflowNode().getId()); + processingQueue.put(processContext.getTaskDetails().getTaskID(), processContext); } - private synchronized void addToCompleteQueue(ProcessPack processPack) { - processingQueue.remove(processPack.getTaskDetails().getTaskID()); - completeList.put(processPack.getTaskDetails().getTaskID(), processPack); + private synchronized void addToCompleteQueue(ProcessContext processContext) { + processingQueue.remove(processContext.getTaskDetails().getTaskID()); + completeList.put(processContext.getTaskDetails().getTaskID(), processContext); } @@ -388,7 +284,6 @@ public class SimpleWorkflowInterpreter implements Runnable{ @Override public void run() { - // TODO: Auto generated method body. try { log.debug("Launching workflow"); launchWorkflow(); @@ -396,8 +291,11 @@ public class SimpleWorkflowInterpreter implements Runnable{ processReadyList(); Thread.sleep(1000); } + log.info("Successfully launched workflow for experiment : " + getExperiment().getExperimentID()); + statusConsumer.stopListen(consumerId); + log.info("Successfully un-bind status consumer for experiment " + getExperiment().getExperimentID()); } catch (Exception e) { - e.printStackTrace(); + //TODO - handle this. } } @@ -406,65 +304,129 @@ public class SimpleWorkflowInterpreter implements Runnable{ log.debug("Retrieve Experiment for experiment id : " + experimentId); } + class TaskMessageHandler implements MessageHandler{ - class TempPublisher implements Runnable { - private TaskDetails tempTaskDetails; - private EventBus tempEventBus; - - public TempPublisher(TaskDetails tempTaskDetails, EventBus tempEventBus) { - this.tempTaskDetails = tempTaskDetails; - this.tempEventBus = tempEventBus; + @Override + public Map<String, Object> getProperties() { + Map<String, Object> props = new HashMap<String, Object>(); + String gatewayId = "*"; + String experimentId = getExperiment().getExperimentID(); + List<String> routingKeys = new ArrayList<String>(); +// routingKeys.add(gatewayName+ "." + getExperiment().getExperimentID() + ".*"); + routingKeys.add(gatewayId); + routingKeys.add(gatewayId + "." + experimentId); + routingKeys.add(gatewayId + "." + experimentId+ ".*"); + routingKeys.add(gatewayId + "." + experimentId+ ".*.*"); + props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys); + return props; } @Override - public void run() { - try { - TaskIdentifier identifier = new TaskIdentifier(tempTaskDetails.getTaskID(), null, null, null); - TaskStatusChangeEvent statusChangeEvent = new TaskStatusChangeEvent(TaskState.PRE_PROCESSING, identifier); - tempEventBus.post(statusChangeEvent); - Thread.sleep(1000); - statusChangeEvent = new TaskStatusChangeEvent(TaskState.WAITING, identifier); - tempEventBus.post(statusChangeEvent); - Thread.sleep(1000); - statusChangeEvent = new TaskStatusChangeEvent(TaskState.INPUT_DATA_STAGING, identifier); - tempEventBus.post(statusChangeEvent); - Thread.sleep(1000); - statusChangeEvent = new TaskStatusChangeEvent(TaskState.STARTED, identifier); - tempEventBus.post(statusChangeEvent); - Thread.sleep(1000); - statusChangeEvent = new TaskStatusChangeEvent(TaskState.EXECUTING, identifier); - tempEventBus.post(statusChangeEvent); - Thread.sleep(1000); - statusChangeEvent = new TaskStatusChangeEvent(TaskState.POST_PROCESSING, identifier); - tempEventBus.post(statusChangeEvent); - Thread.sleep(1000); - statusChangeEvent = new TaskStatusChangeEvent(TaskState.OUTPUT_DATA_STAGING, identifier); - tempEventBus.post(statusChangeEvent); - Thread.sleep(1000); - statusChangeEvent = new TaskStatusChangeEvent(TaskState.COMPLETED, identifier); - tempEventBus.post(statusChangeEvent); - Thread.sleep(1000); + public void onMessage(MessageContext msgCtx) { + String message; + if (msgCtx.getType() == MessageType.TASK) { + TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent(); + TaskIdentifier taskIdentifier = event.getTaskIdentity(); + handleTaskStatusChangeEvent(event); + message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId(); + log.debug(message); + }else if (msgCtx.getType() == MessageType.TASKOUTPUT) { + TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent(); + TaskIdentifier taskIdentifier = event.getTaskIdentity(); + handleTaskOutputChangeEvent(event); + message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId(); + log.debug(message); + } else { + // not interesting, ignores + } + } - List<InputDataObjectType> applicationInputs = tempTaskDetails.getApplicationInputs(); - List<OutputDataObjectType> applicationOutputs = tempTaskDetails.getApplicationOutputs(); - log.info("************** Task output change event fired for application id :" + tempTaskDetails.getApplicationId()); - if (tempTaskDetails.getApplicationId().equals("Add") || tempTaskDetails.getApplicationId().equals("Add_2")) { - applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) + - Integer.parseInt(applicationInputs.get(1).getValue())) + ""); - } else if (tempTaskDetails.getApplicationId().equals("Subtract")) { - applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) - - Integer.parseInt(applicationInputs.get(1).getValue())) + ""); - } else if (tempTaskDetails.getApplicationId().equals("Multiply")) { - applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) * - Integer.parseInt(applicationInputs.get(1).getValue())) + ""); + private void handleTaskOutputChangeEvent(TaskOutputChangeEvent taskOutputChangeEvent) { + + String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId(); + log.debug("Task Output changed event received for workflow node : " + + taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId); + ProcessContext processContext = processingQueue.get(taskId); + Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>(); + if (processContext != null) { + WorkflowNode workflowNode = processContext.getWorkflowNode(); + if (workflowNode instanceof ApplicationNode) { + ApplicationNode applicationNode = (ApplicationNode) workflowNode; + // Workflow node can have one to many output ports and each output port can have one to many links + for (OutPort outPort : applicationNode.getOutputPorts()) { + for (OutputDataObjectType outputDataObjectType : taskOutputChangeEvent.getOutput()) { + if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) { + outPort.getOutputObject().setValue(outputDataObjectType.getValue()); + break; + } + } + for (Edge edge : outPort.getOutEdges()) { + edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue()); + if (edge.getToPort().getNode().isReady()) { + addToReadyQueue(edge.getToPort().getNode()); + } + } + } } - TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent(applicationOutputs, identifier); - eventBus.post(taskOutputChangeEvent); + addToCompleteQueue(processContext); + log.debug("removed task from processing queue : " + taskId); + } + } - } catch (InterruptedException e) { - log.error("Thread was interrupted while sleeping"); + private void handleTaskStatusChangeEvent(TaskStatusChangeEvent taskStatusChangeEvent) { + TaskState taskState = taskStatusChangeEvent.getState(); + TaskIdentifier taskIdentity = taskStatusChangeEvent.getTaskIdentity(); + String taskId = taskIdentity.getTaskId(); + ProcessContext processContext = processingQueue.get(taskId); + if (processContext != null) { + WorkflowNodeState wfNodeState = WorkflowNodeState.UNKNOWN; + switch (taskState) { + case WAITING: + break; + case STARTED: + break; + case PRE_PROCESSING: + processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING); + break; + case INPUT_DATA_STAGING: + processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING); + break; + case EXECUTING: + processContext.getWorkflowNode().setState(NodeState.EXECUTING); + break; + case OUTPUT_DATA_STAGING: + processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING); + break; + case POST_PROCESSING: + processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING); + break; + case COMPLETED: + processContext.getWorkflowNode().setState(NodeState.EXECUTED); + break; + case FAILED: + processContext.getWorkflowNode().setState(NodeState.FAILED); + break; + case UNKNOWN: + break; + case CONFIGURING_WORKSPACE: + break; + case CANCELED: + case CANCELING: + processContext.getWorkflowNode().setState(NodeState.FAILED); + break; + default: + break; + } + if (wfNodeState != WorkflowNodeState.UNKNOWN) { + try { + updateWorkflowNodeStatus(processContext.getWfNodeDetails(), wfNodeState); + } catch (RegistryException e) { + // TODO: handle this. + } + } } } } + }
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java index 116a10d..b12260d 100644 --- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java @@ -22,7 +22,7 @@ package org.apache.airavata.simple.workflow.engine; import org.apache.airavata.registry.cpi.RegistryException; -import org.apache.airavata.simple.workflow.engine.parser.AiravataDefaultParser; +import org.apache.airavata.simple.workflow.engine.parser.AiravataWorkflowParser; /** * Singleton class, only one instance can exist in runtime. @@ -55,7 +55,7 @@ public class WorkflowFactoryImpl implements WorkflowFactory { public WorkflowParser getWorkflowParser(String experimentId, String credentialToken) { if (workflowParser == null) { try { - workflowParser = new AiravataDefaultParser(experimentId, credentialToken); + workflowParser = new AiravataWorkflowParser(experimentId, credentialToken); } catch (RegistryException e) { // TODO : handle this scenario } http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParser.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParser.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParser.java deleted file mode 100644 index 644eda6..0000000 --- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParser.java +++ /dev/null @@ -1,293 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.simple.workflow.engine.parser; - -import org.airavata.appcatalog.cpi.AppCatalogException; -import org.airavata.appcatalog.cpi.WorkflowCatalog; -import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; -import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; -import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; -import org.apache.airavata.model.workspace.experiment.Experiment; -import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; -import org.apache.airavata.registry.cpi.Registry; -import org.apache.airavata.registry.cpi.RegistryException; -import org.apache.airavata.registry.cpi.RegistryModelType; -import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNodeImpl; -import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode; -import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNodeImpl; -import org.apache.airavata.simple.workflow.engine.dag.port.OutPortImpl; -import org.apache.airavata.workflow.model.component.ComponentException; -import org.apache.airavata.workflow.model.component.system.ConstantComponent; -import org.apache.airavata.workflow.model.component.system.InputComponent; -import org.apache.airavata.workflow.model.component.system.S3InputComponent; -import org.apache.airavata.workflow.model.graph.DataEdge; -import org.apache.airavata.workflow.model.graph.DataPort; -import org.apache.airavata.workflow.model.graph.GraphException; -import org.apache.airavata.workflow.model.graph.Node; -import org.apache.airavata.workflow.model.graph.impl.NodeImpl; -import org.apache.airavata.workflow.model.graph.system.OutputNode; -import org.apache.airavata.workflow.model.graph.system.SystemDataPort; -import org.apache.airavata.workflow.model.graph.ws.WSNode; -import org.apache.airavata.workflow.model.graph.ws.WSPort; -import org.apache.airavata.workflow.model.wf.Workflow; -import org.apache.airavata.simple.workflow.engine.WorkflowParser; -import org.apache.airavata.simple.workflow.engine.dag.edge.DirectedEdge; -import org.apache.airavata.simple.workflow.engine.dag.edge.Edge; -import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode; -import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode; -import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode; -import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNodeImpl; -import org.apache.airavata.simple.workflow.engine.dag.port.InPort; -import org.apache.airavata.simple.workflow.engine.dag.port.InputPortIml; -import org.apache.airavata.simple.workflow.engine.dag.port.OutPort; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class AiravataDefaultParser implements WorkflowParser { - - private String credentialToken ; - private Workflow workflow; - - - private Experiment experiment; - private Map<String, WorkflowNode> wfNodes = new HashMap<String, WorkflowNode>(); - - - public AiravataDefaultParser(String experimentId, String credentialToken) throws RegistryException { - this.experiment = getExperiment(experimentId); - this.credentialToken = credentialToken; - } - - public AiravataDefaultParser(Experiment experiment, String credentialToken) { - this.credentialToken = credentialToken; - this.experiment = experiment; - } - - @Override - public List<WorkflowInputNode> parse() throws RegistryException, AppCatalogException, - ComponentException, GraphException { - return parseWorkflow(getWorkflowFromExperiment(experiment)); - } - - public List<WorkflowInputNode> parseWorkflow(Workflow workflow) { - List<Node> gNodes = getInputNodes(workflow); - List<WorkflowInputNode> wfInputNodes = new ArrayList<WorkflowInputNode>(); - List<PortContainer> portContainers = new ArrayList<PortContainer>(); - List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs(); - Map<String,InputDataObjectType> inputDataMap=new HashMap<String, InputDataObjectType>(); - WorkflowInputNode wfInputNode = null; - for (InputDataObjectType dataObjectType : experimentInputs) { - inputDataMap.put(dataObjectType.getName(), dataObjectType); - } - for (Node gNode : gNodes) { - wfInputNode = new WorkflowInputNodeImpl(gNode.getID(), gNode.getName()); - wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getName())); - if (wfInputNode.getInputObject() == null) { - // TODO: throw an error and exit. - } - portContainers.addAll(processOutPorts(gNode, wfInputNode)); - wfInputNodes.add(wfInputNode); - } - - // while port container is not empty iterate graph and build the workflow DAG. - buildModel(portContainers); - - return wfInputNodes; - } - - private void buildModel(List<PortContainer> portContainerList) { - // end condition of recursive call. - if (portContainerList == null || portContainerList.isEmpty()) { - return ; - } - DataPort dataPort = null; - InPort inPort = null; - ApplicationNode wfApplicationNode = null; - WorkflowOutputNode wfOutputNode = null; - List<PortContainer> nextPortContainerList = new ArrayList<PortContainer>(); - for (PortContainer portContainer : portContainerList) { - dataPort = portContainer.getDataPort(); - inPort = portContainer.getInPort(); - Node node = dataPort.getNode(); - if (node instanceof WSNode) { - WSNode wsNode = (WSNode) node; - WorkflowNode wfNode = wfNodes.get(wsNode.getID()); - if (wfNode == null) { - wfApplicationNode = createApplicationNode(wsNode); - wfNodes.put(wfApplicationNode.getId(), wfApplicationNode); - nextPortContainerList.addAll(processOutPorts(wsNode, wfApplicationNode)); - } else if (wfNode instanceof ApplicationNode) { - wfApplicationNode = (ApplicationNode) wfNode; - } else { - // TODO : handle this scenario - } - inPort.setNode(wfApplicationNode); - wfApplicationNode.addInPort(inPort); - - }else if (node instanceof OutputNode) { - OutputNode oNode = (OutputNode) node; - wfOutputNode = createWorkflowOutputNode(oNode); - wfOutputNode.setInPort(inPort); - inPort.setNode(wfOutputNode); - wfNodes.put(wfOutputNode.getId(), wfOutputNode); - } - } - buildModel(nextPortContainerList); - - } - - private WorkflowOutputNode createWorkflowOutputNode(OutputNode oNode) { - WorkflowOutputNodeImpl workflowOutputNode = new WorkflowOutputNodeImpl(oNode.getID(), oNode.getName()); - OutputDataObjectType outputDataObjectType = new OutputDataObjectType(); - outputDataObjectType.setType(oNode.getParameterType()); - workflowOutputNode.setOutputObject(outputDataObjectType); - return workflowOutputNode; - } - - private ApplicationNode createApplicationNode(WSNode wsNode) { - ApplicationNode applicationNode = new ApplicationNodeImpl(wsNode.getID(), - wsNode.getComponent().getApplication().getName(), - wsNode.getComponent().getApplication().getApplicationId()); - return applicationNode; - } - - private List<PortContainer> processOutPorts(Node node, WorkflowNode wfNode) { - OutPort outPort ; - Edge edge; - InPort inPort = null; - List<PortContainer> portContainers = new ArrayList<PortContainer>(); - for (DataPort dataPort : node.getOutputPorts()) { - outPort = createOutPort(dataPort); - for (DataEdge dataEdge : dataPort.getEdges()) { - edge = new DirectedEdge(); - edge.setFromPort(outPort); - outPort.addEdge(edge); - inPort = createInPort(dataEdge.getToPort()); - edge.setToPort(inPort); - inPort.addEdge(edge); - portContainers.add(new PortContainer(dataEdge.getToPort(), inPort)); - } - outPort.setNode(wfNode); - if (wfNode instanceof WorkflowInputNode) { - WorkflowInputNode workflowInputNode = (WorkflowInputNode) wfNode; - workflowInputNode.setOutPort(outPort); - } else if (wfNode instanceof ApplicationNode) { - ApplicationNode applicationNode = ((ApplicationNode) wfNode); - applicationNode.addOutPort(outPort); - } - } - return portContainers; - } - - private OutPort createOutPort(DataPort dataPort) { - OutPortImpl outPort = new OutPortImpl(dataPort.getID()); - OutputDataObjectType outputDataObjectType = new OutputDataObjectType(); - if (dataPort instanceof WSPort) { - WSPort wsPort = (WSPort) dataPort; - outputDataObjectType.setName(wsPort.getFromNode().getName()); - outputDataObjectType.setType(wsPort.getType()); - }else if (dataPort instanceof SystemDataPort) { - SystemDataPort sysPort = (SystemDataPort) dataPort; - outputDataObjectType.setName(sysPort.getFromNode().getName()); - outputDataObjectType.setType(sysPort.getType()); - } - - outPort.setOutputObject(outputDataObjectType); - return outPort; - } - - private InPort createInPort(DataPort toPort) { - InPort inPort = new InputPortIml(toPort.getID()); - InputDataObjectType inputDataObjectType = new InputDataObjectType(); - if (toPort instanceof WSPort) { - WSPort wsPort = (WSPort) toPort; - inputDataObjectType.setName(wsPort.getName()); - inputDataObjectType.setType(wsPort.getType()); - inputDataObjectType.setApplicationArgument(wsPort.getComponentPort().getApplicationArgument()); - inputDataObjectType.setIsRequired(!wsPort.getComponentPort().isOptional()); - inputDataObjectType.setInputOrder(wsPort.getComponentPort().getInputOrder()); - - inPort.setDefaultValue(wsPort.getComponentPort().getDefaultValue()); - }else if (toPort instanceof SystemDataPort) { - SystemDataPort sysPort = (SystemDataPort) toPort; - inputDataObjectType.setName(sysPort.getName()); - inputDataObjectType.setType(sysPort.getType()); - } - inPort.setInputObject(inputDataObjectType); - return inPort; - } - - private InputDataObjectType getInputDataObject(DataPort dataPort) { - InputDataObjectType inputDataObject = new InputDataObjectType(); - inputDataObject.setName(dataPort.getName()); - if (dataPort instanceof WSPort) { - WSPort port = (WSPort) dataPort; - inputDataObject.setInputOrder(port.getComponentPort().getInputOrder()); - inputDataObject.setApplicationArgument(port.getComponentPort().getApplicationArgument() == null ? - "" : port.getComponentPort().getApplicationArgument()); - inputDataObject.setType(dataPort.getType()); - } - return inputDataObject; - } - - private OutputDataObjectType getOutputDataObject(InputDataObjectType inputObject) { - OutputDataObjectType outputDataObjectType = new OutputDataObjectType(); - outputDataObjectType.setApplicationArgument(inputObject.getApplicationArgument()); - outputDataObjectType.setName(inputObject.getName()); - outputDataObjectType.setType(inputObject.getType()); - outputDataObjectType.setValue(inputObject.getValue()); - return outputDataObjectType; - } - - private Experiment getExperiment(String experimentId) throws RegistryException { - Registry registry = RegistryFactory.getDefaultRegistry(); - return (Experiment)registry.get(RegistryModelType.EXPERIMENT, experimentId); - } - - private Workflow getWorkflowFromExperiment(Experiment experiment) throws RegistryException, AppCatalogException, GraphException, ComponentException { - WorkflowCatalog workflowCatalog = getWorkflowCatalog(); - return new Workflow(workflowCatalog.getWorkflow(experiment.getApplicationId()).getGraph()); - } - - private WorkflowCatalog getWorkflowCatalog() throws AppCatalogException { - return AppCatalogFactory.getAppCatalog().getWorkflowCatalog(); - } - - private ArrayList<Node> getInputNodes(Workflow wf) { - ArrayList<Node> list = new ArrayList<Node>(); - List<NodeImpl> nodes = wf.getGraph().getNodes(); - for (Node node : nodes) { - String name = node.getComponent().getName(); - if (InputComponent.NAME.equals(name) || ConstantComponent.NAME.equals(name) || S3InputComponent.NAME.equals(name)) { - list.add(node); - } - } - return list; - } - - public Map<String, WorkflowNode> getWfNodes() { - return wfNodes; - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java new file mode 100644 index 0000000..673fbdc --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java @@ -0,0 +1,291 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine.parser; + +import org.airavata.appcatalog.cpi.AppCatalogException; +import org.airavata.appcatalog.cpi.WorkflowCatalog; +import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.model.workspace.experiment.Experiment; +import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; +import org.apache.airavata.registry.cpi.Registry; +import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.airavata.registry.cpi.RegistryModelType; +import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNodeImpl; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNodeImpl; +import org.apache.airavata.simple.workflow.engine.dag.port.OutPortImpl; +import org.apache.airavata.workflow.model.component.ComponentException; +import org.apache.airavata.workflow.model.component.system.ConstantComponent; +import org.apache.airavata.workflow.model.component.system.InputComponent; +import org.apache.airavata.workflow.model.component.system.S3InputComponent; +import org.apache.airavata.workflow.model.graph.DataEdge; +import org.apache.airavata.workflow.model.graph.DataPort; +import org.apache.airavata.workflow.model.graph.GraphException; +import org.apache.airavata.workflow.model.graph.Node; +import org.apache.airavata.workflow.model.graph.impl.NodeImpl; +import org.apache.airavata.workflow.model.graph.system.OutputNode; +import org.apache.airavata.workflow.model.graph.system.SystemDataPort; +import org.apache.airavata.workflow.model.graph.ws.WSNode; +import org.apache.airavata.workflow.model.graph.ws.WSPort; +import org.apache.airavata.workflow.model.wf.Workflow; +import org.apache.airavata.simple.workflow.engine.WorkflowParser; +import org.apache.airavata.simple.workflow.engine.dag.edge.DirectedEdge; +import org.apache.airavata.simple.workflow.engine.dag.edge.Edge; +import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNodeImpl; +import org.apache.airavata.simple.workflow.engine.dag.port.InPort; +import org.apache.airavata.simple.workflow.engine.dag.port.InputPortIml; +import org.apache.airavata.simple.workflow.engine.dag.port.OutPort; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class AiravataWorkflowParser implements WorkflowParser { + + private String credentialToken ; + + private Experiment experiment; + private Map<String, WorkflowNode> wfNodes = new HashMap<String, WorkflowNode>(); + + + public AiravataWorkflowParser(String experimentId, String credentialToken) throws RegistryException { + this.experiment = getExperiment(experimentId); + this.credentialToken = credentialToken; + } + + public AiravataWorkflowParser(Experiment experiment, String credentialToken) { + this.credentialToken = credentialToken; + this.experiment = experiment; + } + + @Override + public List<WorkflowInputNode> parse() throws RegistryException, AppCatalogException, + ComponentException, GraphException { + return parseWorkflow(getWorkflowFromExperiment(experiment)); + } + + public List<WorkflowInputNode> parseWorkflow(Workflow workflow) { + List<Node> gNodes = getInputNodes(workflow); + List<WorkflowInputNode> wfInputNodes = new ArrayList<WorkflowInputNode>(); + List<PortContainer> portContainers = new ArrayList<PortContainer>(); + List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs(); + Map<String,InputDataObjectType> inputDataMap=new HashMap<String, InputDataObjectType>(); + WorkflowInputNode wfInputNode = null; + for (InputDataObjectType dataObjectType : experimentInputs) { + inputDataMap.put(dataObjectType.getName(), dataObjectType); + } + for (Node gNode : gNodes) { + wfInputNode = new WorkflowInputNodeImpl(gNode.getID(), gNode.getName()); + wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getName())); + if (wfInputNode.getInputObject() == null) { + // TODO: throw an error and exit. + } + portContainers.addAll(processOutPorts(gNode, wfInputNode)); + wfInputNodes.add(wfInputNode); + } + + // while port container is not empty iterate graph and build the workflow DAG. + buildModel(portContainers); + + return wfInputNodes; + } + + private void buildModel(List<PortContainer> portContainerList) { + // end condition of recursive call. + if (portContainerList == null || portContainerList.isEmpty()) { + return ; + } + DataPort dataPort = null; + InPort inPort = null; + ApplicationNode wfApplicationNode = null; + WorkflowOutputNode wfOutputNode = null; + List<PortContainer> nextPortContainerList = new ArrayList<PortContainer>(); + for (PortContainer portContainer : portContainerList) { + dataPort = portContainer.getDataPort(); + inPort = portContainer.getInPort(); + Node node = dataPort.getNode(); + if (node instanceof WSNode) { + WSNode wsNode = (WSNode) node; + WorkflowNode wfNode = wfNodes.get(wsNode.getID()); + if (wfNode == null) { + wfApplicationNode = createApplicationNode(wsNode); + wfNodes.put(wfApplicationNode.getId(), wfApplicationNode); + nextPortContainerList.addAll(processOutPorts(wsNode, wfApplicationNode)); + } else if (wfNode instanceof ApplicationNode) { + wfApplicationNode = (ApplicationNode) wfNode; + } else { + // TODO : handle this scenario + } + inPort.setNode(wfApplicationNode); + wfApplicationNode.addInPort(inPort); + + }else if (node instanceof OutputNode) { + OutputNode oNode = (OutputNode) node; + wfOutputNode = createWorkflowOutputNode(oNode); + wfOutputNode.setInPort(inPort); + inPort.setNode(wfOutputNode); + wfNodes.put(wfOutputNode.getId(), wfOutputNode); + } + } + buildModel(nextPortContainerList); + + } + + private WorkflowOutputNode createWorkflowOutputNode(OutputNode oNode) { + WorkflowOutputNodeImpl workflowOutputNode = new WorkflowOutputNodeImpl(oNode.getID(), oNode.getName()); + OutputDataObjectType outputDataObjectType = new OutputDataObjectType(); + outputDataObjectType.setType(oNode.getParameterType()); + workflowOutputNode.setOutputObject(outputDataObjectType); + return workflowOutputNode; + } + + private ApplicationNode createApplicationNode(WSNode wsNode) { + ApplicationNode applicationNode = new ApplicationNodeImpl(wsNode.getID(), + wsNode.getComponent().getApplication().getName(), + wsNode.getComponent().getApplication().getApplicationId()); + return applicationNode; + } + + private List<PortContainer> processOutPorts(Node node, WorkflowNode wfNode) { + OutPort outPort ; + Edge edge; + InPort inPort = null; + List<PortContainer> portContainers = new ArrayList<PortContainer>(); + for (DataPort dataPort : node.getOutputPorts()) { + outPort = createOutPort(dataPort); + for (DataEdge dataEdge : dataPort.getEdges()) { + edge = new DirectedEdge(); + edge.setFromPort(outPort); + outPort.addEdge(edge); + inPort = createInPort(dataEdge.getToPort()); + edge.setToPort(inPort); + inPort.addEdge(edge); + portContainers.add(new PortContainer(dataEdge.getToPort(), inPort)); + } + outPort.setNode(wfNode); + if (wfNode instanceof WorkflowInputNode) { + WorkflowInputNode workflowInputNode = (WorkflowInputNode) wfNode; + workflowInputNode.setOutPort(outPort); + } else if (wfNode instanceof ApplicationNode) { + ApplicationNode applicationNode = ((ApplicationNode) wfNode); + applicationNode.addOutPort(outPort); + } + } + return portContainers; + } + + private OutPort createOutPort(DataPort dataPort) { + OutPortImpl outPort = new OutPortImpl(dataPort.getID()); + OutputDataObjectType outputDataObjectType = new OutputDataObjectType(); + if (dataPort instanceof WSPort) { + WSPort wsPort = (WSPort) dataPort; + outputDataObjectType.setName(wsPort.getComponentPort().getName()); + outputDataObjectType.setType(wsPort.getType()); + }else if (dataPort instanceof SystemDataPort) { + SystemDataPort sysPort = (SystemDataPort) dataPort; + outputDataObjectType.setName(sysPort.getFromNode().getName()); + outputDataObjectType.setType(sysPort.getType()); + } + + outPort.setOutputObject(outputDataObjectType); + return outPort; + } + + private InPort createInPort(DataPort toPort) { + InPort inPort = new InputPortIml(toPort.getID()); + InputDataObjectType inputDataObjectType = new InputDataObjectType(); + if (toPort instanceof WSPort) { + WSPort wsPort = (WSPort) toPort; + inputDataObjectType.setName(wsPort.getName()); + inputDataObjectType.setType(wsPort.getType()); + inputDataObjectType.setApplicationArgument(wsPort.getComponentPort().getApplicationArgument()); + inputDataObjectType.setIsRequired(!wsPort.getComponentPort().isOptional()); + inputDataObjectType.setInputOrder(wsPort.getComponentPort().getInputOrder()); + + inPort.setDefaultValue(wsPort.getComponentPort().getDefaultValue()); + }else if (toPort instanceof SystemDataPort) { + SystemDataPort sysPort = (SystemDataPort) toPort; + inputDataObjectType.setName(sysPort.getName()); + inputDataObjectType.setType(sysPort.getType()); + } + inPort.setInputObject(inputDataObjectType); + return inPort; + } + + private InputDataObjectType getInputDataObject(DataPort dataPort) { + InputDataObjectType inputDataObject = new InputDataObjectType(); + inputDataObject.setName(dataPort.getName()); + if (dataPort instanceof WSPort) { + WSPort port = (WSPort) dataPort; + inputDataObject.setInputOrder(port.getComponentPort().getInputOrder()); + inputDataObject.setApplicationArgument(port.getComponentPort().getApplicationArgument() == null ? + "" : port.getComponentPort().getApplicationArgument()); + inputDataObject.setType(dataPort.getType()); + } + return inputDataObject; + } + + private OutputDataObjectType getOutputDataObject(InputDataObjectType inputObject) { + OutputDataObjectType outputDataObjectType = new OutputDataObjectType(); + outputDataObjectType.setApplicationArgument(inputObject.getApplicationArgument()); + outputDataObjectType.setName(inputObject.getName()); + outputDataObjectType.setType(inputObject.getType()); + outputDataObjectType.setValue(inputObject.getValue()); + return outputDataObjectType; + } + + private Experiment getExperiment(String experimentId) throws RegistryException { + Registry registry = RegistryFactory.getDefaultRegistry(); + return (Experiment)registry.get(RegistryModelType.EXPERIMENT, experimentId); + } + + private Workflow getWorkflowFromExperiment(Experiment experiment) throws RegistryException, AppCatalogException, GraphException, ComponentException { + WorkflowCatalog workflowCatalog = getWorkflowCatalog(); + return new Workflow(workflowCatalog.getWorkflow(experiment.getApplicationId()).getGraph()); + } + + private WorkflowCatalog getWorkflowCatalog() throws AppCatalogException { + return AppCatalogFactory.getAppCatalog().getWorkflowCatalog(); + } + + private ArrayList<Node> getInputNodes(Workflow wf) { + ArrayList<Node> list = new ArrayList<Node>(); + List<NodeImpl> nodes = wf.getGraph().getNodes(); + for (Node node : nodes) { + String name = node.getComponent().getName(); + if (InputComponent.NAME.equals(name) || ConstantComponent.NAME.equals(name) || S3InputComponent.NAME.equals(name)) { + list.add(node); + } + } + return list; + } + + public Map<String, WorkflowNode> getWfNodes() { + return wfNodes; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParserTest.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParserTest.java b/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParserTest.java deleted file mode 100644 index e9b3e55..0000000 --- a/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParserTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.simple.workflow.engine.parser; - -import org.apache.airavata.model.appcatalog.appinterface.DataType; -import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; -import org.apache.airavata.model.workspace.experiment.Experiment; -import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode; -import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode; -import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode; -import org.apache.airavata.workflow.model.wf.Workflow; -import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -public class AiravataDefaultParserTest { - - @Before - public void setUp() throws Exception { - - } - - @After - public void tearDown() throws Exception { - - } - - @Test - public void testWorkflowParse() throws Exception { - Assert.assertNotNull("Test file (ComplexMathWorkflow.awf) is missing", getClass().getResource("/ComplexMathWorkflow.awf")); - InputStreamReader isr = new InputStreamReader(this.getClass().getResourceAsStream("/ComplexMathWorkflow.awf")); - BufferedReader br = new BufferedReader(isr); - StringBuffer sb = new StringBuffer(); - String nextLine = br.readLine(); - while (nextLine != null) { - sb.append(nextLine); - nextLine = br.readLine(); - } - Workflow workflow = new Workflow(sb.toString()); - Experiment experiment = new Experiment(); - InputDataObjectType x = new InputDataObjectType(); - x.setValue("6"); - x.setType(DataType.STRING); - x.setName("x"); - - InputDataObjectType y = new InputDataObjectType(); - y.setValue("8"); - y.setType(DataType.STRING); - y.setName("y"); - - InputDataObjectType z = new InputDataObjectType(); - z.setValue("10"); - z.setType(DataType.STRING); - z.setName("y_2"); - - List<InputDataObjectType> inputs = new ArrayList<InputDataObjectType>(); - inputs.add(x); - inputs.add(y); - inputs.add(z); - experiment.setExperimentInputs(inputs); - // create parser - AiravataDefaultParser parser = new AiravataDefaultParser(experiment, "testCredentialId"); - List<WorkflowInputNode> workflowInputNodes = parser.parseWorkflow(workflow); - Assert.assertNotNull(workflowInputNodes); - Assert.assertEquals(3, workflowInputNodes.size()); - for (WorkflowInputNode workflowInputNode : workflowInputNodes) { - Assert.assertNotNull(workflowInputNode.getOutPort()); - Assert.assertNotNull(workflowInputNode.getInputObject()); - } - - Map<String, WorkflowNode> wfNodes = parser.getWfNodes(); - for (String wfId : wfNodes.keySet()) { - WorkflowNode wfNode = wfNodes.get(wfId); - if (wfNode instanceof ApplicationNode) { - ApplicationNode node = (ApplicationNode) wfNode; - Assert.assertEquals(2, node.getInputPorts().size()); - Assert.assertNotNull(node.getInputPorts().get(0).getInputObject()); - Assert.assertNotNull(node.getInputPorts().get(1).getInputObject()); - Assert.assertNotNull(node.getInputPorts().get(0).getEdge()); - Assert.assertNotNull(node.getInputPorts().get(1).getEdge()); - - Assert.assertEquals(1, node.getOutputPorts().size()); - Assert.assertEquals(1, node.getOutputPorts().get(0).getOutEdges().size()); - Assert.assertNotNull(node.getOutputPorts().get(0).getOutEdges().get(0)); - } else if (wfNode instanceof WorkflowOutputNode) { - WorkflowOutputNode workflowOutputNode = (WorkflowOutputNode) wfNode; - Assert.assertNotNull(workflowOutputNode.getInPort()); - } - } - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java b/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java new file mode 100644 index 0000000..6443806 --- /dev/null +++ b/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java @@ -0,0 +1,119 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine.parser; + +import org.apache.airavata.model.appcatalog.appinterface.DataType; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.model.workspace.experiment.Experiment; +import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode; +import org.apache.airavata.workflow.model.wf.Workflow; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class AiravataWorkflowParserTest { + + @Before + public void setUp() throws Exception { + + } + + @After + public void tearDown() throws Exception { + + } + + @Test + public void testWorkflowParse() throws Exception { + Assert.assertNotNull("Test file (ComplexMathWorkflow.awf) is missing", getClass().getResource("/ComplexMathWorkflow.awf")); + InputStreamReader isr = new InputStreamReader(this.getClass().getResourceAsStream("/ComplexMathWorkflow.awf")); + BufferedReader br = new BufferedReader(isr); + StringBuffer sb = new StringBuffer(); + String nextLine = br.readLine(); + while (nextLine != null) { + sb.append(nextLine); + nextLine = br.readLine(); + } + Workflow workflow = new Workflow(sb.toString()); + Experiment experiment = new Experiment(); + InputDataObjectType x = new InputDataObjectType(); + x.setValue("6"); + x.setType(DataType.STRING); + x.setName("x"); + + InputDataObjectType y = new InputDataObjectType(); + y.setValue("8"); + y.setType(DataType.STRING); + y.setName("y"); + + InputDataObjectType z = new InputDataObjectType(); + z.setValue("10"); + z.setType(DataType.STRING); + z.setName("y_2"); + + List<InputDataObjectType> inputs = new ArrayList<InputDataObjectType>(); + inputs.add(x); + inputs.add(y); + inputs.add(z); + experiment.setExperimentInputs(inputs); + // create parser + AiravataWorkflowParser parser = new AiravataWorkflowParser(experiment, "testCredentialId"); + List<WorkflowInputNode> workflowInputNodes = parser.parseWorkflow(workflow); + Assert.assertNotNull(workflowInputNodes); + Assert.assertEquals(3, workflowInputNodes.size()); + for (WorkflowInputNode workflowInputNode : workflowInputNodes) { + Assert.assertNotNull(workflowInputNode.getOutPort()); + Assert.assertNotNull(workflowInputNode.getInputObject()); + } + + Map<String, WorkflowNode> wfNodes = parser.getWfNodes(); + for (String wfId : wfNodes.keySet()) { + WorkflowNode wfNode = wfNodes.get(wfId); + if (wfNode instanceof ApplicationNode) { + ApplicationNode node = (ApplicationNode) wfNode; + Assert.assertEquals(2, node.getInputPorts().size()); + Assert.assertNotNull(node.getInputPorts().get(0).getInputObject()); + Assert.assertNotNull(node.getInputPorts().get(1).getInputObject()); + Assert.assertNotNull(node.getInputPorts().get(0).getEdge()); + Assert.assertNotNull(node.getInputPorts().get(1).getEdge()); + + Assert.assertEquals(1, node.getOutputPorts().size()); + Assert.assertEquals(1, node.getOutputPorts().get(0).getOutEdges().size()); + Assert.assertNotNull(node.getOutputPorts().get(0).getOutEdges().get(0)); + } else if (wfNode instanceof WorkflowOutputNode) { + WorkflowOutputNode workflowOutputNode = (WorkflowOutputNode) wfNode; + Assert.assertNotNull(workflowOutputNode.getInPort()); + } + } + + } +} \ No newline at end of file
