Renamed the wrong package name
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6bfb9563 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6bfb9563 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6bfb9563 Branch: refs/heads/master Commit: 6bfb9563a049b943c2ab2287f468a38f8eaf9b78 Parents: 8835097 Author: shamrath <[email protected]> Authored: Mon Feb 23 16:51:21 2015 -0500 Committer: shamrath <[email protected]> Committed: Mon Feb 23 16:51:21 2015 -0500 ---------------------------------------------------------------------- .../simple/workflow/engine/ProcessPack.java | 62 +++ .../engine/SimpleWorkflowInterpreter.java | 470 +++++++++++++++++++ .../simple/workflow/engine/WorkflowFactory.java | 31 ++ .../workflow/engine/WorkflowFactoryImpl.java | 66 +++ .../simple/workflow/engine/WorkflowParser.java | 32 ++ .../simple/workflow/engine/WorkflowUtil.java | 63 +++ .../workflow/engine/dag/edge/DirectedEdge.java | 52 ++ .../simple/workflow/engine/dag/edge/Edge.java | 43 ++ .../engine/dag/nodes/ApplicationNode.java | 41 ++ .../engine/dag/nodes/ApplicationNodeImpl.java | 113 +++++ .../workflow/engine/dag/nodes/NodeState.java | 34 ++ .../workflow/engine/dag/nodes/NodeType.java | 28 ++ .../engine/dag/nodes/WorkflowInputNode.java | 37 ++ .../engine/dag/nodes/WorkflowInputNodeImpl.java | 96 ++++ .../workflow/engine/dag/nodes/WorkflowNode.java | 38 ++ .../engine/dag/nodes/WorkflowOutputNode.java | 37 ++ .../dag/nodes/WorkflowOutputNodeImpl.java | 97 ++++ .../simple/workflow/engine/dag/port/InPort.java | 41 ++ .../workflow/engine/dag/port/InputPortIml.java | 90 ++++ .../workflow/engine/dag/port/OutPort.java | 39 ++ .../workflow/engine/dag/port/OutPortImpl.java | 83 ++++ .../simple/workflow/engine/dag/port/Port.java | 36 ++ .../engine/parser/AiravataDefaultParser.java | 293 ++++++++++++ .../workflow/engine/parser/PortContainer.java | 53 +++ .../simple/workflow/engine/ProcessPack.java | 62 --- .../engine/SimpleWorkflowInterpreter.java | 470 ------------------- .../simple/workflow/engine/WorkflowFactory.java | 31 -- .../workflow/engine/WorkflowFactoryImpl.java | 66 --- .../simple/workflow/engine/WorkflowParser.java | 32 -- .../simple/workflow/engine/WorkflowUtil.java | 63 --- .../workflow/engine/dag/edge/DirectedEdge.java | 52 -- .../simple/workflow/engine/dag/edge/Edge.java | 43 -- .../engine/dag/nodes/ApplicationNode.java | 41 -- .../engine/dag/nodes/ApplicationNodeImpl.java | 113 ----- .../workflow/engine/dag/nodes/NodeState.java | 34 -- .../workflow/engine/dag/nodes/NodeType.java | 28 -- .../engine/dag/nodes/WorkflowInputNode.java | 37 -- .../engine/dag/nodes/WorkflowInputNodeImpl.java | 96 ---- .../workflow/engine/dag/nodes/WorkflowNode.java | 38 -- .../engine/dag/nodes/WorkflowOutputNode.java | 37 -- .../dag/nodes/WorkflowOutputNodeImpl.java | 97 ---- .../simple/workflow/engine/dag/port/InPort.java | 41 -- .../workflow/engine/dag/port/InputPortIml.java | 90 ---- .../workflow/engine/dag/port/OutPort.java | 39 -- .../workflow/engine/dag/port/OutPortImpl.java | 83 ---- .../simple/workflow/engine/dag/port/Port.java | 36 -- .../engine/parser/AiravataDefaultParser.java | 293 ------------ .../workflow/engine/parser/PortContainer.java | 53 --- .../simple/workflow/engine/WorkflowDAGTest.java | 46 ++ .../parser/AiravataDefaultParserTest.java | 119 +++++ .../simple/workflow/engine/WorkflowDAGTest.java | 46 -- .../parser/AiravataDefaultParserTest.java | 119 ----- 52 files changed, 2140 insertions(+), 2140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java new file mode 100644 index 0000000..b58b947 --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine; + +import org.apache.airavata.model.workspace.experiment.TaskDetails; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode; + +public class ProcessPack { + private WorkflowNode workflowNode; + private WorkflowNodeDetails wfNodeDetails; + private TaskDetails taskDetails; + + public ProcessPack(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) { + this.workflowNode = workflowNode; + this.wfNodeDetails = wfNodeDetails; + this.taskDetails = taskDetails; + } + + public WorkflowNode getWorkflowNode() { + return workflowNode; + } + + public void setWorkflowNode(WorkflowNode workflowNode) { + this.workflowNode = workflowNode; + } + + public WorkflowNodeDetails getWfNodeDetails() { + return wfNodeDetails; + } + + public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) { + this.wfNodeDetails = wfNodeDetails; + } + + public TaskDetails getTaskDetails() { + return taskDetails; + } + + public void setTaskDetails(TaskDetails taskDetails) { + this.taskDetails = taskDetails; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java new file mode 100644 index 0000000..6dcb8bd --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java @@ -0,0 +1,470 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine; + +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.model.messaging.event.TaskIdentifier; +import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent; +import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; +import org.apache.airavata.model.util.ExperimentModelUtil; +import org.apache.airavata.model.workspace.experiment.ExecutionUnit; +import org.apache.airavata.model.workspace.experiment.Experiment; +import org.apache.airavata.model.workspace.experiment.TaskDetails; +import org.apache.airavata.model.workspace.experiment.TaskState; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeState; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus; +import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; +import org.apache.airavata.registry.cpi.ChildDataType; +import org.apache.airavata.registry.cpi.Registry; +import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.airavata.registry.cpi.RegistryModelType; +import org.apache.airavata.simple.workflow.engine.dag.edge.Edge; +import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode; +import org.apache.airavata.simple.workflow.engine.dag.nodes.NodeState; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode; +import org.apache.airavata.simple.workflow.engine.parser.AiravataDefaultParser; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode; +import org.apache.airavata.simple.workflow.engine.dag.port.InPort; +import org.apache.airavata.simple.workflow.engine.dag.port.OutPort; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public class SimpleWorkflowInterpreter implements Runnable{ + + private static final Logger log = LoggerFactory.getLogger(SimpleWorkflowInterpreter.class); + + private List<WorkflowInputNode> workflowInputNodes; + + private Experiment experiment; + + private String credentialToken; + + private Map<String, WorkflowNode> readList = new ConcurrentHashMap<String, WorkflowNode>(); + private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>(); + private Map<String, ProcessPack> processingQueue = new ConcurrentHashMap<String, ProcessPack>(); + private Map<String, ProcessPack> completeList = new HashMap<String, ProcessPack>(); + private Registry registry; + private EventBus eventBus = new EventBus(); + private List<WorkflowOutputNode> completeWorkflowOutputs = new ArrayList<WorkflowOutputNode>(); + + public SimpleWorkflowInterpreter(String experimentId, String credentialToken) throws RegistryException { + setExperiment(experimentId); + this.credentialToken = credentialToken; + } + + public SimpleWorkflowInterpreter(Experiment experiment, String credentialStoreToken) { + // read the workflow file and build the topology to a DAG. Then execute that dag + // get workflowInputNode list and start processing + // next() will return ready task and block the thread if no task in ready state. + this.experiment = experiment; + this.credentialToken = credentialStoreToken; + } + + + public void launchWorkflow() throws Exception { + // process workflow input nodes +// WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance(); +// WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken); + WorkflowParser workflowParser = new AiravataDefaultParser(experiment, credentialToken); + log.debug("Initialized workflow parser"); + setWorkflowInputNodes(workflowParser.parse()); + log.debug("Parsed the workflow and got the workflow input nodes"); + processWorkflowInputNodes(getWorkflowInputNodes()); + } + + // try to remove synchronization tag + private synchronized void processReadyList() { + for (WorkflowNode readyNode : readList.values()) { + try { + if (readyNode instanceof WorkflowOutputNode) { + WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode; + wfOutputNode.getOutputObject().setValue(wfOutputNode.getInPort().getInputObject().getValue()); + addToCompleteOutputNodeList(wfOutputNode); + continue; + } + WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode); + TaskDetails process = getProcess(workflowNodeDetails); + ProcessPack processPack = new ProcessPack(readyNode, workflowNodeDetails, process); + addToProcessingQueue(processPack); +// publishToProcessQueue(process); + publishToProcessQueue(processPack); + } catch (RegistryException e) { + // FIXME : handle this exception + } + } + } + + + private void publishToProcessQueue(TaskDetails process) { + Thread thread = new Thread(new TempPublisher(process, eventBus)); + thread.start(); + //TODO: publish to process queue. + } + + // TODO : remove this test method + private void publishToProcessQueue(ProcessPack process) { + WorkflowNode workflowNode = process.getWorkflowNode(); + if (workflowNode instanceof ApplicationNode) { + ApplicationNode applicationNode = (ApplicationNode) workflowNode; + List<InPort> inputPorts = applicationNode.getInputPorts(); + if (applicationNode.getName().equals("Add")) { + applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf( + Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) + Integer.parseInt(inputPorts.get(1).getInputObject().getValue()))); + } else if (applicationNode.getName().equals("Multiply")) { + applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf( + Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) * Integer.parseInt(inputPorts.get(1).getInputObject().getValue()))); + } else if (applicationNode.getName().equals("Subtract")) { + applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf( + Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) - Integer.parseInt(inputPorts.get(1).getInputObject().getValue()))); + } else { + throw new RuntimeException("Invalid Application name"); + } + + for (Edge edge : applicationNode.getOutputPorts().get(0).getOutEdges()) { + WorkflowUtil.copyValues(applicationNode.getOutputPorts().get(0).getOutputObject(), edge.getToPort().getInputObject()); + if (edge.getToPort().getNode().isReady()) { + addToReadyQueue(edge.getToPort().getNode()); + } else { + addToWaitingQueue(edge.getToPort().getNode()); + } + } + } else if (workflowNode instanceof WorkflowOutputNode) { + WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) workflowNode; + throw new RuntimeException("Workflow output node in processing queue"); + } + + processingQueue.remove(process.getTaskDetails().getTaskID()); + } + + private TaskDetails getProcess(WorkflowNodeDetails wfNodeDetails) throws RegistryException { + // create workflow taskDetails from workflowNodeDetails + TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromWorkflowNodeDetails(getExperiment(), wfNodeDetails); + taskDetails.setTaskID(getRegistry() + .add(ChildDataType.TASK_DETAIL, taskDetails, wfNodeDetails.getNodeInstanceId()).toString()); + return taskDetails; + } + + private WorkflowNodeDetails createWorkflowNodeDetails(WorkflowNode readyNode) throws RegistryException { + WorkflowNodeDetails wfNodeDetails = ExperimentModelUtil.createWorkflowNode(readyNode.getId(), null); + ExecutionUnit executionUnit = ExecutionUnit.APPLICATION; + String executionData = null; + if (readyNode instanceof ApplicationNode) { + executionUnit = ExecutionUnit.APPLICATION; + executionData = ((ApplicationNode) readyNode).getApplicationId(); + } else if (readyNode instanceof WorkflowInputNode) { + executionUnit = ExecutionUnit.INPUT; + } else if (readyNode instanceof WorkflowOutputNode) { + executionUnit = ExecutionUnit.OUTPUT; + } + wfNodeDetails.setExecutionUnit(executionUnit); + wfNodeDetails.setExecutionUnitData(executionData); + setupNodeDetailsInput(readyNode, wfNodeDetails); + wfNodeDetails.setNodeInstanceId((String) getRegistry() + .add(ChildDataType.WORKFLOW_NODE_DETAIL, wfNodeDetails, getExperiment().getExperimentID())); +// nodeInstanceList.put(node, wfNodeDetails); + return wfNodeDetails; + } + + private void setupNodeDetailsInput(WorkflowNode readyNode, WorkflowNodeDetails wfNodeDetails) { + if (readyNode instanceof ApplicationNode) { + ApplicationNode applicationNode = (ApplicationNode) readyNode; + if (applicationNode.isReady()) { + for (InPort inPort : applicationNode.getInputPorts()) { + wfNodeDetails.addToNodeInputs(inPort.getInputObject()); + } + } else { + // TODO: handle this scenario properly. + } + } else { + // TODO: do we support for other type of workflow nodes ? + } + } + + + private void processWorkflowInputNodes(List<WorkflowInputNode> wfInputNodes) { + Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>(); + for (WorkflowInputNode wfInputNode : wfInputNodes) { + if (wfInputNode.isReady()) { + log.debug("Workflow node : " + wfInputNode.getId() + " is ready to execute"); + for (Edge edge : wfInputNode.getOutPort().getOutEdges()) { + edge.getToPort().getInputObject().setValue(wfInputNode.getInputObject().getValue()); + if (edge.getToPort().getNode().isReady()) { + addToReadyQueue(edge.getToPort().getNode()); + log.debug("Added workflow node : " + edge.getToPort().getNode().getId() + " to the readyQueue"); + } else { + addToWaitingQueue(edge.getToPort().getNode()); + log.debug("Added workflow node " + edge.getToPort().getNode().getId() + " to the waitingQueue"); + + } + } + } + } + } + + + public List<WorkflowInputNode> getWorkflowInputNodes() throws Exception { + return workflowInputNodes; + } + + public void setWorkflowInputNodes(List<WorkflowInputNode> workflowInputNodes) { + this.workflowInputNodes = workflowInputNodes; + } + + + private List<WorkflowInputNode> parseWorkflowDescription(){ + return null; + } + + + private Registry getRegistry() throws RegistryException { + if (registry==null){ + registry = RegistryFactory.getDefaultRegistry(); + } + return registry; + } + + public Experiment getExperiment() { + return experiment; + } + + private void updateWorkflowNodeStatus(WorkflowNodeDetails wfNodeDetails, WorkflowNodeState state) throws RegistryException{ + WorkflowNodeStatus status = ExperimentModelUtil.createWorkflowNodeStatus(state); + wfNodeDetails.setWorkflowNodeStatus(status); + getRegistry().update(RegistryModelType.WORKFLOW_NODE_STATUS, status, wfNodeDetails.getNodeInstanceId()); + } + + @Subscribe + public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent){ + String taskId = taskOutputEvent.getTaskIdentity().getTaskId(); + log.debug("Task Output changed event received for workflow node : " + + taskOutputEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId); + ProcessPack processPack = processingQueue.get(taskId); + Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>(); + if (processPack != null) { + WorkflowNode workflowNode = processPack.getWorkflowNode(); + if (workflowNode instanceof ApplicationNode) { + ApplicationNode applicationNode = (ApplicationNode) workflowNode; + // Workflow node can have one to many output ports and each output port can have one to many links + for (OutPort outPort : applicationNode.getOutputPorts()) { + for (OutputDataObjectType outputDataObjectType : taskOutputEvent.getOutput()) { + if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) { + outPort.getOutputObject().setValue(outputDataObjectType.getValue()); + break; + } + } + for (Edge edge : outPort.getOutEdges()) { + WorkflowUtil.copyValues(outPort.getOutputObject(), edge.getToPort().getInputObject()); + if (edge.getToPort().getNode().isReady()) { + addToReadyQueue(edge.getToPort().getNode()); + } + } + } + } + processingQueue.remove(taskId); + log.debug("removed task from processing queue : " + taskId); + } + + } + + @Subscribe + public void taskStatusChanged(TaskStatusChangeEvent taskStatus){ + String taskId = taskStatus.getTaskIdentity().getTaskId(); + ProcessPack processPack = processingQueue.get(taskId); + if (processPack != null) { + WorkflowNodeState wfNodeState = WorkflowNodeState.UNKNOWN; + switch (taskStatus.getState()) { + case WAITING: + break; + case STARTED: + break; + case PRE_PROCESSING: + processPack.getWorkflowNode().setState(NodeState.PRE_PROCESSING); + break; + case INPUT_DATA_STAGING: + processPack.getWorkflowNode().setState(NodeState.PRE_PROCESSING); + break; + case EXECUTING: + processPack.getWorkflowNode().setState(NodeState.EXECUTING); + break; + case OUTPUT_DATA_STAGING: + processPack.getWorkflowNode().setState(NodeState.POST_PROCESSING); + break; + case POST_PROCESSING: + processPack.getWorkflowNode().setState(NodeState.POST_PROCESSING); + break; + case COMPLETED: + processPack.getWorkflowNode().setState(NodeState.EXECUTED); + break; + case FAILED: + processPack.getWorkflowNode().setState(NodeState.FAILED); + break; + case UNKNOWN: + break; + case CONFIGURING_WORKSPACE: + break; + case CANCELED: + case CANCELING: + processPack.getWorkflowNode().setState(NodeState.FAILED); + break; + default: + break; + } + if (wfNodeState != WorkflowNodeState.UNKNOWN) { + try { + updateWorkflowNodeStatus(processPack.getWfNodeDetails(), wfNodeState); + } catch (RegistryException e) { + // TODO: handle this. + } + } + } + + } + + /** + * Remove the workflow node from waiting queue and add it to the ready queue. + * @param workflowNode - Workflow Node + */ + private synchronized void addToReadyQueue(WorkflowNode workflowNode) { + waitingList.remove(workflowNode.getId()); + readList.put(workflowNode.getId(), workflowNode); + } + + private void addToWaitingQueue(WorkflowNode workflowNode) { + waitingList.put(workflowNode.getId(), workflowNode); + } + + /** + * First remove the node from ready list and then add the WfNodeContainer to the process queue. + * Note that underline data structure of the process queue is a Map. + * @param processPack - has both workflow and correspond workflowNodeDetails and TaskDetails + */ + private synchronized void addToProcessingQueue(ProcessPack processPack) { + readList.remove(processPack.getWorkflowNode().getId()); + processingQueue.put(processPack.getTaskDetails().getTaskID(), processPack); + } + + private synchronized void addToCompleteQueue(ProcessPack processPack) { + processingQueue.remove(processPack.getTaskDetails().getTaskID()); + completeList.put(processPack.getTaskDetails().getTaskID(), processPack); + } + + + private void addToCompleteOutputNodeList(WorkflowOutputNode wfOutputNode) { + completeWorkflowOutputs.add(wfOutputNode); + readList.remove(wfOutputNode.getId()); + } + + @Override + public void run() { + // TODO: Auto generated method body. + try { + log.debug("Launching workflow"); + launchWorkflow(); + while (!(waitingList.isEmpty() && readList.isEmpty())) { + processReadyList(); + Thread.sleep(1000); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void setExperiment(String experimentId) throws RegistryException { + experiment = (Experiment) getRegistry().get(RegistryModelType.EXPERIMENT, experimentId); + log.debug("Retrieve Experiment for experiment id : " + experimentId); + } + + + class TempPublisher implements Runnable { + private TaskDetails tempTaskDetails; + private EventBus tempEventBus; + + public TempPublisher(TaskDetails tempTaskDetails, EventBus tempEventBus) { + this.tempTaskDetails = tempTaskDetails; + this.tempEventBus = tempEventBus; + } + + @Override + public void run() { + try { + TaskIdentifier identifier = new TaskIdentifier(tempTaskDetails.getTaskID(), null, null, null); + TaskStatusChangeEvent statusChangeEvent = new TaskStatusChangeEvent(TaskState.PRE_PROCESSING, identifier); + tempEventBus.post(statusChangeEvent); + Thread.sleep(1000); + statusChangeEvent = new TaskStatusChangeEvent(TaskState.WAITING, identifier); + tempEventBus.post(statusChangeEvent); + Thread.sleep(1000); + statusChangeEvent = new TaskStatusChangeEvent(TaskState.INPUT_DATA_STAGING, identifier); + tempEventBus.post(statusChangeEvent); + Thread.sleep(1000); + statusChangeEvent = new TaskStatusChangeEvent(TaskState.STARTED, identifier); + tempEventBus.post(statusChangeEvent); + Thread.sleep(1000); + statusChangeEvent = new TaskStatusChangeEvent(TaskState.EXECUTING, identifier); + tempEventBus.post(statusChangeEvent); + Thread.sleep(1000); + statusChangeEvent = new TaskStatusChangeEvent(TaskState.POST_PROCESSING, identifier); + tempEventBus.post(statusChangeEvent); + Thread.sleep(1000); + statusChangeEvent = new TaskStatusChangeEvent(TaskState.OUTPUT_DATA_STAGING, identifier); + tempEventBus.post(statusChangeEvent); + Thread.sleep(1000); + statusChangeEvent = new TaskStatusChangeEvent(TaskState.COMPLETED, identifier); + tempEventBus.post(statusChangeEvent); + Thread.sleep(1000); + + List<InputDataObjectType> applicationInputs = tempTaskDetails.getApplicationInputs(); + List<OutputDataObjectType> applicationOutputs = tempTaskDetails.getApplicationOutputs(); + log.info("************** Task output change event fired for application id :" + tempTaskDetails.getApplicationId()); + if (tempTaskDetails.getApplicationId().equals("Add") || tempTaskDetails.getApplicationId().equals("Add_2")) { + applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) + + Integer.parseInt(applicationInputs.get(1).getValue())) + ""); + } else if (tempTaskDetails.getApplicationId().equals("Subtract")) { + applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) - + Integer.parseInt(applicationInputs.get(1).getValue())) + ""); + } else if (tempTaskDetails.getApplicationId().equals("Multiply")) { + applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) * + Integer.parseInt(applicationInputs.get(1).getValue())) + ""); + } + TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent(applicationOutputs, identifier); + eventBus.post(taskOutputChangeEvent); + + } catch (InterruptedException e) { + log.error("Thread was interrupted while sleeping"); + } + + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactory.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactory.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactory.java new file mode 100644 index 0000000..3de90f2 --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactory.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine; + +/** + * All classes implement this WorkflowFactory interface, should be abstract or singleton. + */ +public interface WorkflowFactory { + + public WorkflowParser getWorkflowParser(String experimentId, String credentialToken); + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java new file mode 100644 index 0000000..116a10d --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine; + +import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.airavata.simple.workflow.engine.parser.AiravataDefaultParser; + +/** + * Singleton class, only one instance can exist in runtime. + */ +public class WorkflowFactoryImpl implements WorkflowFactory { + + private static WorkflowFactoryImpl workflowFactoryImpl; + + private WorkflowParser workflowParser; + + private static final String synch = "sync"; + + private WorkflowFactoryImpl(){ + + } + + public static WorkflowFactoryImpl getInstance() { + if (workflowFactoryImpl == null) { + synchronized (synch) { + if (workflowFactoryImpl == null) { + workflowFactoryImpl = new WorkflowFactoryImpl(); + } + } + } + return workflowFactoryImpl; + } + + + @Override + public WorkflowParser getWorkflowParser(String experimentId, String credentialToken) { + if (workflowParser == null) { + try { + workflowParser = new AiravataDefaultParser(experimentId, credentialToken); + } catch (RegistryException e) { + // TODO : handle this scenario + } + } + return workflowParser; + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowParser.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowParser.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowParser.java new file mode 100644 index 0000000..6c4d6f2 --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowParser.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine; + +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode; + +import java.util.List; + +public interface WorkflowParser { + + public List<WorkflowInputNode> parse() throws Exception; + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowUtil.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowUtil.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowUtil.java new file mode 100644 index 0000000..a2b69ae --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowUtil.java @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine; + +import com.google.common.eventbus.EventBus; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.model.messaging.event.TaskIdentifier; +import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; +import org.apache.airavata.model.workspace.experiment.TaskDetails; +import org.apache.airavata.model.workspace.experiment.TaskState; +import org.apache.airavata.persistance.registry.jpa.model.TaskDetail; + +public class WorkflowUtil { + + public static InputDataObjectType copyValues(InputDataObjectType fromInputObj, InputDataObjectType toInputObj){ + if (toInputObj == null) { + // TODO : throw an error + } + toInputObj.setValue(fromInputObj.getValue()); + if (fromInputObj.getApplicationArgument() != null + && !fromInputObj.getApplicationArgument().trim().equals("")) { + toInputObj.setApplicationArgument(fromInputObj.getApplicationArgument()); + } + if (toInputObj.getType() == null) { + toInputObj.setType(fromInputObj.getType()); + } + return fromInputObj; + } + + public static InputDataObjectType copyValues(OutputDataObjectType outputData, InputDataObjectType inputData) { + inputData.setValue(outputData.getValue()); + return inputData; + } + + + public static OutputDataObjectType copyValues(InputDataObjectType inputObject, OutputDataObjectType outputObject) { + if (outputObject == null) { + outputObject = new OutputDataObjectType(); + } + outputObject.setValue(inputObject.getValue()); + return outputObject; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java new file mode 100644 index 0000000..3bc380d --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine.dag.edge; + +import org.apache.airavata.simple.workflow.engine.dag.port.OutPort; +import org.apache.airavata.simple.workflow.engine.dag.port.InPort; + + +public class DirectedEdge implements Edge { + + private InPort inPort; + private OutPort outPort; + + @Override + public InPort getToPort() { + return inPort; + } + + @Override + public void setToPort(InPort inPort) { + this.inPort = inPort; + } + + @Override + public OutPort getFromPort() { + return outPort; + } + + @Override + public void setFromPort(OutPort outPort) { + this.outPort = outPort; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/Edge.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/Edge.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/Edge.java new file mode 100644 index 0000000..e8bce2e --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/Edge.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine.dag.edge; + +import org.apache.airavata.simple.workflow.engine.dag.port.InPort; +import org.apache.airavata.simple.workflow.engine.dag.port.OutPort; + +/** + * Edge is a link to one node to another, basically edge should have outPort of a workflow node , + * which is starting point and inPort of a workflow node, which is end point of the edge. + */ + +public interface Edge { + + public InPort getToPort(); + + public void setToPort(InPort inPort); + + public OutPort getFromPort(); + + public void setFromPort(OutPort outPort); + + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNode.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNode.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNode.java new file mode 100644 index 0000000..37efded --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNode.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine.dag.nodes; + +import org.apache.airavata.simple.workflow.engine.dag.port.InPort; +import org.apache.airavata.simple.workflow.engine.dag.port.OutPort; + +import java.util.List; + +public interface ApplicationNode extends WorkflowNode { + + public String getApplicationId(); + + public void addInPort(InPort inPort); + + public List<InPort> getInputPorts(); + + public void addOutPort(OutPort outPort); + + public List<OutPort> getOutputPorts(); + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java new file mode 100644 index 0000000..52b0595 --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine.dag.nodes; + +import org.apache.airavata.simple.workflow.engine.dag.port.InPort; +import org.apache.airavata.simple.workflow.engine.dag.port.OutPort; + +import java.util.ArrayList; +import java.util.List; + +public class ApplicationNodeImpl implements ApplicationNode { + + private final String nodeId; + private NodeState myState = NodeState.WAITING; + private String applicationId; + private List<InPort> inPorts = new ArrayList<InPort>(); + private List<OutPort> outPorts = new ArrayList<OutPort>(); + private String applicationName; + +// public ApplicationNodeImpl(String nodeId) { +// this(nodeId, null); +// } +// +// public ApplicationNodeImpl(String nodeId, String applicationId) { +// this(nodeId, null, applicationId); +// } + + public ApplicationNodeImpl(String nodeId, String applicationName, String applicationId) { + this.nodeId = nodeId; + this.applicationName = applicationName; + this.applicationId = applicationId; + } + + @Override + public String getId() { + return this.nodeId; + } + + @Override + public String getName() { + return applicationName; + } + + @Override + public NodeType getType() { + return NodeType.APPLICATION; + } + + @Override + public NodeState getState() { + return myState; + } + + @Override + public void setState(NodeState newState) { + // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE + myState = newState; + } + + @Override + public boolean isReady() { + for (InPort inPort : getInputPorts()) { + if (!inPort.isReady()) { + return false; + } + } + return true; + } + + @Override + public String getApplicationId() { + return this.applicationId; + } + + @Override + public void addInPort(InPort inPort) { + this.inPorts.add(inPort); + } + + @Override + public List<InPort> getInputPorts() { + return this.inPorts; + } + + @Override + public void addOutPort(OutPort outPort) { + this.outPorts.add(outPort); + } + + @Override + public List<OutPort> getOutputPorts() { + return this.outPorts; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java new file mode 100644 index 0000000..333fcb2 --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine.dag.nodes; + +public enum NodeState { + WAITING, // waiting on inputs + READY, // all inputs are available and ready to execute + QUEUED, // + PRE_PROCESSING, // + EXECUTING, // task has been submitted , not yet finish + EXECUTED, // task executed + POST_PROCESSING, // + FAILED, + COMPLETE // all works done +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeType.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeType.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeType.java new file mode 100644 index 0000000..95710fb --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeType.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine.dag.nodes; + +public enum NodeType { + APPLICATION, + WORKFLOW_INPUT, + WORKFLOW_OUTPUT +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNode.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNode.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNode.java new file mode 100644 index 0000000..9ac800a --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNode.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine.dag.nodes; + +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.simple.workflow.engine.dag.port.OutPort; + +public interface WorkflowInputNode extends WorkflowNode { + + public InputDataObjectType getInputObject(); + + public void setInputObject(InputDataObjectType inputObject); + + public OutPort getOutPort(); + + public void setOutPort(OutPort outPort); + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java new file mode 100644 index 0000000..b3dfa62 --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.airavata.simple.workflow.engine.dag.nodes; + +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.simple.workflow.engine.dag.port.OutPort; + +public class WorkflowInputNodeImpl implements WorkflowInputNode { + + private NodeState myState = NodeState.READY; + private final String nodeId; + private String nodeName; + private OutPort outPort; + private InputDataObjectType inputDataObjectType; + private String name; + + public WorkflowInputNodeImpl(String nodeId) { + this(nodeId, null); + } + + public WorkflowInputNodeImpl(String nodeId, String nodeName) { + this.nodeId = nodeId; + this.nodeName = nodeName; + } + + @Override + public String getId() { + return this.nodeId; + } + + @Override + public String getName() { + return this.nodeName; + } + + @Override + public NodeType getType() { + return NodeType.WORKFLOW_INPUT; + } + + @Override + public NodeState getState() { + return myState; + } + + @Override + public void setState(NodeState newState) { + // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE + myState = newState; + } + + @Override + public boolean isReady() { + return (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals("")) + || !inputDataObjectType.isIsRequired(); + } + + @Override + public InputDataObjectType getInputObject() { + return this.inputDataObjectType; + } + + @Override + public void setInputObject(InputDataObjectType inputObject) { + this.inputDataObjectType = inputObject; + } + + @Override + public OutPort getOutPort() { + return this.outPort; + } + + @Override + public void setOutPort(OutPort outPort) { + this.outPort = outPort; + } + + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowNode.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowNode.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowNode.java new file mode 100644 index 0000000..efcf9c7 --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowNode.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine.dag.nodes; + +public interface WorkflowNode { + + public String getId(); + + public String getName(); + + public NodeType getType(); + + public NodeState getState(); + + public void setState(NodeState newState); + + public boolean isReady(); + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNode.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNode.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNode.java new file mode 100644 index 0000000..14e4519 --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNode.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine.dag.nodes; + +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.simple.workflow.engine.dag.port.InPort; + +public interface WorkflowOutputNode extends WorkflowNode { + + public OutputDataObjectType getOutputObject(); + + public void setOutputObject(OutputDataObjectType outputObject); + + public InPort getInPort(); + + public void setInPort(InPort inPort); + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java new file mode 100644 index 0000000..5924212 --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine.dag.nodes; + +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.simple.workflow.engine.dag.port.InPort; + +public class WorkflowOutputNodeImpl implements WorkflowOutputNode { + + private NodeState myState = NodeState.WAITING; + private final String nodeId; + private String nodeName; + private OutputDataObjectType outputDataObjectType; + private InPort inPort; + + public WorkflowOutputNodeImpl(String nodeId) { + this(nodeId, null); + } + + public WorkflowOutputNodeImpl(String nodeId, String nodeName) { + this.nodeId = nodeId; + this.nodeName = nodeName; + } + + @Override + public String getId() { + return this.nodeId; + } + + @Override + public String getName() { + return this.nodeName; + } + + @Override + public NodeType getType() { + return NodeType.WORKFLOW_OUTPUT; + } + + @Override + public NodeState getState() { + return myState; + } + + @Override + public void setState(NodeState newState) { + // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE + myState = newState; + } + + @Override + public boolean isReady() { + return !(inPort.getInputObject() == null || inPort.getInputObject().getValue() == null + || inPort.getInputObject().getValue().equals("")); + } + + @Override + public OutputDataObjectType getOutputObject() { + return this.outputDataObjectType; + } + + @Override + public void setOutputObject(OutputDataObjectType outputObject) { + this.outputDataObjectType = outputObject; + } + + @Override + public InPort getInPort() { + return this.inPort; + } + + @Override + public void setInPort(InPort inPort) { + this.inPort = inPort; + } + +} + http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InPort.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InPort.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InPort.java new file mode 100644 index 0000000..bb4a112 --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InPort.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine.dag.port; + +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.simple.workflow.engine.dag.edge.Edge; + +public interface InPort extends Port { + + public void setInputObject(InputDataObjectType inputObject); + + public InputDataObjectType getInputObject(); + + public Edge getEdge(); + + public void addEdge(Edge edge); + + public String getDefaultValue(); + + public void setDefaultValue(String defaultValue); + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InputPortIml.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InputPortIml.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InputPortIml.java new file mode 100644 index 0000000..c78dc86 --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InputPortIml.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.airavata.simple.workflow.engine.dag.port; + +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.simple.workflow.engine.dag.edge.Edge; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode; + +public class InputPortIml implements InPort { + + private InputDataObjectType inputDataObjectType; + private boolean ready = false; + private String portId; + private Edge edge; + private WorkflowNode node; + private String defaultValue; + + public InputPortIml(String portId) { + this.portId = portId; + } + + @Override + public void setInputObject(InputDataObjectType inputObject) { + this.inputDataObjectType = inputObject; + ready = (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals("")) + || !inputDataObjectType.isIsRequired(); + } + + @Override + public InputDataObjectType getInputObject() { + return this.inputDataObjectType; + } + + @Override + public Edge getEdge() { + return this.edge; + } + + @Override + public void addEdge(Edge edge) { + this.edge = edge; + } + + @Override + public String getDefaultValue() { + return defaultValue; + } + + public void setDefaultValue(String defaultValue) { + this.defaultValue = defaultValue; + } + + @Override + public boolean isReady() { + return getInputObject() != null && inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals(""); + } + + @Override + public WorkflowNode getNode() { + return this.node; + } + + @Override + public void setNode(WorkflowNode workflowNode) { + this.node = workflowNode; + } + + @Override + public String getId() { + return this.portId; + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPort.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPort.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPort.java new file mode 100644 index 0000000..0332f81 --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPort.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine.dag.port; + +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.simple.workflow.engine.dag.edge.Edge; + +import java.util.List; + +public interface OutPort extends Port { + + public void setOutputObject(OutputDataObjectType outputObject); + + public OutputDataObjectType getOutputObject(); + + public List<Edge> getOutEdges(); + + public void addEdge(Edge edge); + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPortImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPortImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPortImpl.java new file mode 100644 index 0000000..4e26cb3 --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPortImpl.java @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine.dag.port; + +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.simple.workflow.engine.dag.edge.Edge; +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode; + +import java.util.ArrayList; +import java.util.List; + +public class OutPortImpl implements OutPort { + + private OutputDataObjectType outputDataObjectType; + private List<Edge> outEdges = new ArrayList<Edge>(); + private boolean isSatisfy = false; + private String portId; + private WorkflowNode node; + + public OutPortImpl(String portId) { + this.portId = portId; + } + + @Override + public void setOutputObject(OutputDataObjectType outputObject) { + this.outputDataObjectType = outputObject; + } + + @Override + public OutputDataObjectType getOutputObject() { + return this.outputDataObjectType; + } + + @Override + public List<Edge> getOutEdges() { + return this.outEdges; + } + + @Override + public void addEdge(Edge edge) { + this.outEdges.add(edge); + } + + @Override + public boolean isReady() { + return this.outputDataObjectType.getValue() != null + && !this.outputDataObjectType.getValue().equals(""); + } + + @Override + public WorkflowNode getNode() { + return this.node; + } + + @Override + public void setNode(WorkflowNode workflowNode) { + this.node = workflowNode; + } + + @Override + public String getId() { + return portId; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/Port.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/Port.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/Port.java new file mode 100644 index 0000000..2b27ea0 --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/Port.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.simple.workflow.engine.dag.port; + +import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode; + +public interface Port { + + public boolean isReady(); + + public WorkflowNode getNode(); + + public void setNode(WorkflowNode workflowNode); + + public String getId(); + +}
