http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/pom.xml ---------------------------------------------------------------------- diff --git a/modules/workflow/pom.xml b/modules/workflow/pom.xml new file mode 100644 index 0000000..4fb8e09 --- /dev/null +++ b/modules/workflow/pom.xml @@ -0,0 +1,22 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata</artifactId> + <version>0.15-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>workflow</artifactId> + <packaging>pom</packaging> + <version>0.15-SNAPSHOT</version> + <name>Airavata Workflow</name> + <modules> + <module>workflow-core</module> + </modules> + +</project> \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/pom.xml ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/pom.xml b/modules/workflow/workflow-core/pom.xml new file mode 100644 index 0000000..1cb8e0d --- /dev/null +++ b/modules/workflow/workflow-core/pom.xml @@ -0,0 +1,74 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>workflow</artifactId> + <groupId>org.apache.airavata</groupId> + <version>0.15-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>workflow-core</artifactId> + <name>Airavata Workflow Core</name> + + <dependencies> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-data-models</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-registry-cpi</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-model-utils</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-jpa-registry</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- Airavata default parser dependency --> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-workflow-model-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>app-catalog-data</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>app-catalog-cpi</artifactId> + <version>${project.version}</version> + </dependency> + <!-- Messaging dependency --> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-messaging-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>18.0</version> + </dependency> + + <!--test--> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/ProcessContext.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/ProcessContext.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/ProcessContext.java new file mode 100644 index 0000000..e0c2651 --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/ProcessContext.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core; + +import org.apache.airavata.model.workspace.experiment.TaskDetails; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; +import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode; + +public class ProcessContext { + private WorkflowNode workflowNode; + private WorkflowNodeDetails wfNodeDetails; + private TaskDetails taskDetails; + + public ProcessContext(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) { + this.workflowNode = workflowNode; + this.wfNodeDetails = wfNodeDetails; + this.taskDetails = taskDetails; + } + + public WorkflowNode getWorkflowNode() { + return workflowNode; + } + + public void setWorkflowNode(WorkflowNode workflowNode) { + this.workflowNode = workflowNode; + } + + public WorkflowNodeDetails getWfNodeDetails() { + return wfNodeDetails; + } + + public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) { + this.wfNodeDetails = wfNodeDetails; + } + + public TaskDetails getTaskDetails() { + return taskDetails; + } + + public void setTaskDetails(TaskDetails taskDetails) { + this.taskDetails = taskDetails; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java new file mode 100644 index 0000000..a674aad --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java @@ -0,0 +1,400 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core; + +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher; +import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.airavata.model.messaging.event.ProcessSubmitEvent; +import org.apache.airavata.model.messaging.event.TaskIdentifier; +import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent; +import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; +import org.apache.airavata.model.util.ExperimentModelUtil; +import org.apache.airavata.model.workspace.experiment.ExecutionUnit; +import org.apache.airavata.model.workspace.experiment.Experiment; +import org.apache.airavata.model.workspace.experiment.TaskDetails; +import org.apache.airavata.model.workspace.experiment.TaskState; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeState; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus; +import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; +import org.apache.airavata.registry.cpi.ChildDataType; +import org.apache.airavata.registry.cpi.Registry; +import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.airavata.registry.cpi.RegistryModelType; +import org.apache.airavata.workflow.core.dag.edge.Edge; +import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode; +import org.apache.airavata.workflow.core.dag.nodes.NodeState; +import org.apache.airavata.workflow.core.dag.nodes.WorkflowInputNode; +import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode; +import org.apache.airavata.workflow.core.dag.nodes.WorkflowOutputNode; +import org.apache.airavata.workflow.core.dag.port.InPort; +import org.apache.airavata.workflow.core.dag.port.OutPort; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Package-Private class + */ +class SimpleWorkflowInterpreter{ + + private static final Logger log = LoggerFactory.getLogger(SimpleWorkflowInterpreter.class); + private List<WorkflowInputNode> workflowInputNodes; + + private Experiment experiment; + + private String credentialToken; + + private String gatewayName; + + private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<String, WorkflowNode>(); + private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>(); + private Map<String, ProcessContext> processingQueue = new ConcurrentHashMap<String, ProcessContext>(); + private Map<String, ProcessContext> completeList = new HashMap<String, ProcessContext>(); + private Registry registry; + private List<WorkflowOutputNode> completeWorkflowOutputs = new ArrayList<WorkflowOutputNode>(); + private RabbitMQProcessPublisher publisher; + private RabbitMQStatusConsumer statusConsumer; + private String consumerId; + private boolean continueWorkflow = true; + + public SimpleWorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessPublisher publisher) throws RegistryException { + this.gatewayName = gatewayName; + setExperiment(experimentId); + this.credentialToken = credentialToken; + this.publisher = publisher; + } + + public SimpleWorkflowInterpreter(Experiment experiment, String credentialStoreToken, String gatewayName, RabbitMQProcessPublisher publisher) { + this.gatewayName = gatewayName; + this.experiment = experiment; + this.credentialToken = credentialStoreToken; + this.publisher = publisher; + } + + /** + * Package-Private method. + * @throws Exception + */ + void launchWorkflow() throws Exception { + WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance(); + WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken); + log.debug("Initialized workflow parser"); + setWorkflowInputNodes(workflowParser.parse()); + log.debug("Parsed the workflow and got the workflow input nodes"); + // process workflow input nodes + processWorkflowInputNodes(getWorkflowInputNodes()); + if (readyList.isEmpty()) { + StringBuilder sb = new StringBuilder(); + for (WorkflowInputNode workflowInputNode : workflowInputNodes) { + sb.append(", "); + sb.append(workflowInputNode.getInputObject().getName()); + sb.append("="); + sb.append(workflowInputNode.getInputObject().getValue()); + } + throw new AiravataException("No workflow application node is in ready state to run with experiment inputs" + sb.toString()); + } + processReadyList(); + } + + // try to remove synchronization tag + /** + * Package-Private method. + * @throws RegistryException + * @throws AiravataException + */ + void processReadyList() throws RegistryException, AiravataException { + if (readyList.isEmpty() && processingQueue.isEmpty() && !waitingList.isEmpty()) { + throw new AiravataException("No workflow application node is in ready state to run"); + } + for (WorkflowNode readyNode : readyList.values()) { + if (readyNode instanceof WorkflowOutputNode) { + WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode; + wfOutputNode.getOutputObject().setValue(wfOutputNode.getInPort().getInputObject().getValue()); + addToCompleteOutputNodeList(wfOutputNode); + continue; + } + WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode); + TaskDetails process = getProcess(workflowNodeDetails); + ProcessContext processContext = new ProcessContext(readyNode, workflowNodeDetails, process); + addToProcessingQueue(processContext); + publishToProcessQueue(process); + } + } + + + private void publishToProcessQueue(TaskDetails process) throws AiravataException { + ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent(); + processSubmitEvent.setCredentialToken(credentialToken); + processSubmitEvent.setTaskId(process.getTaskID()); + MessageContext messageContext = new MessageContext(processSubmitEvent, MessageType.TASK, process.getTaskID(), null); + messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + publisher.publish(messageContext); + } + + private TaskDetails getProcess(WorkflowNodeDetails wfNodeDetails) throws RegistryException { + // create workflow taskDetails from workflowNodeDetails + TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromWorkflowNodeDetails(getExperiment(), wfNodeDetails); + taskDetails.setTaskID(getRegistry() + .add(ChildDataType.TASK_DETAIL, taskDetails, wfNodeDetails.getNodeInstanceId()).toString()); + return taskDetails; + } + + private WorkflowNodeDetails createWorkflowNodeDetails(WorkflowNode readyNode) throws RegistryException { + WorkflowNodeDetails wfNodeDetails = ExperimentModelUtil.createWorkflowNode(readyNode.getId(), null); + ExecutionUnit executionUnit = ExecutionUnit.APPLICATION; + String executionData = null; + if (readyNode instanceof ApplicationNode) { + executionUnit = ExecutionUnit.APPLICATION; + executionData = ((ApplicationNode) readyNode).getApplicationId(); + setupNodeDetailsInput(((ApplicationNode) readyNode), wfNodeDetails); + } else if (readyNode instanceof WorkflowInputNode) { + executionUnit = ExecutionUnit.INPUT; + } else if (readyNode instanceof WorkflowOutputNode) { + executionUnit = ExecutionUnit.OUTPUT; + } + wfNodeDetails.setExecutionUnit(executionUnit); + wfNodeDetails.setExecutionUnitData(executionData); + wfNodeDetails.setNodeInstanceId((String) getRegistry() + .add(ChildDataType.WORKFLOW_NODE_DETAIL, wfNodeDetails, getExperiment().getExperimentID())); + return wfNodeDetails; + } + + private void setupNodeDetailsInput(ApplicationNode readyAppNode, WorkflowNodeDetails wfNodeDetails) { + if (readyAppNode.isReady()) { + for (InPort inPort : readyAppNode.getInputPorts()) { + wfNodeDetails.addToNodeInputs(inPort.getInputObject()); + } + } else { + throw new IllegalArgumentException("Application node should be in ready state to set inputs to the " + + "workflow node details, nodeId = " + readyAppNode.getId()); + } + } + + + private void processWorkflowInputNodes(List<WorkflowInputNode> wfInputNodes) { + Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>(); + for (WorkflowInputNode wfInputNode : wfInputNodes) { + if (wfInputNode.isReady()) { + log.debug("Workflow node : " + wfInputNode.getId() + " is ready to execute"); + for (Edge edge : wfInputNode.getOutPort().getOutEdges()) { + edge.getToPort().getInputObject().setValue(wfInputNode.getInputObject().getValue()); + if (edge.getToPort().getNode().isReady()) { + addToReadyQueue(edge.getToPort().getNode()); + log.debug("Added workflow node : " + edge.getToPort().getNode().getId() + " to the readyQueue"); + } else { + addToWaitingQueue(edge.getToPort().getNode()); + log.debug("Added workflow node " + edge.getToPort().getNode().getId() + " to the waitingQueue"); + + } + } + } + } + } + + + public List<WorkflowInputNode> getWorkflowInputNodes() throws Exception { + return workflowInputNodes; + } + + public void setWorkflowInputNodes(List<WorkflowInputNode> workflowInputNodes) { + this.workflowInputNodes = workflowInputNodes; + } + + private Registry getRegistry() throws RegistryException { + if (registry==null){ + registry = RegistryFactory.getDefaultRegistry(); + } + return registry; + } + + public Experiment getExperiment() { + return experiment; + } + + private void updateWorkflowNodeStatus(WorkflowNodeDetails wfNodeDetails, WorkflowNodeState state) throws RegistryException{ + WorkflowNodeStatus status = ExperimentModelUtil.createWorkflowNodeStatus(state); + wfNodeDetails.setWorkflowNodeStatus(status); + getRegistry().update(RegistryModelType.WORKFLOW_NODE_STATUS, status, wfNodeDetails.getNodeInstanceId()); + } + + /** + * Package-Private method. + * Remove the workflow node from waiting queue and add it to the ready queue. + * @param workflowNode - Workflow Node + */ + synchronized void addToReadyQueue(WorkflowNode workflowNode) { + waitingList.remove(workflowNode.getId()); + readyList.put(workflowNode.getId(), workflowNode); + } + + private void addToWaitingQueue(WorkflowNode workflowNode) { + waitingList.put(workflowNode.getId(), workflowNode); + } + + /** + * First remove the node from ready list and then add the WfNodeContainer to the process queue. + * Note that underline data structure of the process queue is a Map. + * @param processContext - has both workflow and correspond workflowNodeDetails and TaskDetails + */ + private synchronized void addToProcessingQueue(ProcessContext processContext) { + readyList.remove(processContext.getWorkflowNode().getId()); + processingQueue.put(processContext.getTaskDetails().getTaskID(), processContext); + } + + private synchronized void addToCompleteQueue(ProcessContext processContext) { + processingQueue.remove(processContext.getTaskDetails().getTaskID()); + completeList.put(processContext.getTaskDetails().getTaskID(), processContext); + } + + + private void addToCompleteOutputNodeList(WorkflowOutputNode wfOutputNode) { + completeWorkflowOutputs.add(wfOutputNode); + readyList.remove(wfOutputNode.getId()); + } + + boolean isAllDone() { + return !continueWorkflow || (waitingList.isEmpty() && readyList.isEmpty() && processingQueue.isEmpty()); + } + + private void setExperiment(String experimentId) throws RegistryException { + experiment = (Experiment) getRegistry().get(RegistryModelType.EXPERIMENT, experimentId); + log.debug("Retrieve Experiment for experiment id : " + experimentId); + } + + synchronized void handleTaskOutputChangeEvent(TaskOutputChangeEvent taskOutputChangeEvent) { + + String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId(); + log.debug("Task Output changed event received for workflow node : " + + taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId); + ProcessContext processContext = processingQueue.get(taskId); + Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>(); + if (processContext != null) { + WorkflowNode workflowNode = processContext.getWorkflowNode(); + if (workflowNode instanceof ApplicationNode) { + ApplicationNode applicationNode = (ApplicationNode) workflowNode; + // Workflow node can have one to many output ports and each output port can have one to many links + for (OutPort outPort : applicationNode.getOutputPorts()) { + for (OutputDataObjectType outputDataObjectType : taskOutputChangeEvent.getOutput()) { + if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) { + outPort.getOutputObject().setValue(outputDataObjectType.getValue()); + break; + } + } + for (Edge edge : outPort.getOutEdges()) { + edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue()); + if (edge.getToPort().getNode().isReady()) { + addToReadyQueue(edge.getToPort().getNode()); + } + } + } + } + addToCompleteQueue(processContext); + log.debug("removed task from processing queue : " + taskId); + try { + processReadyList(); + } catch (Exception e) { + log.error("Error while processing ready workflow nodes", e); + continueWorkflow = false; + } + } + } + + void handleTaskStatusChangeEvent(TaskStatusChangeEvent taskStatusChangeEvent) { + TaskState taskState = taskStatusChangeEvent.getState(); + TaskIdentifier taskIdentity = taskStatusChangeEvent.getTaskIdentity(); + String taskId = taskIdentity.getTaskId(); + ProcessContext processContext = processingQueue.get(taskId); + if (processContext != null) { + WorkflowNodeState wfNodeState = WorkflowNodeState.INVOKED; + switch (taskState) { + case WAITING: + break; + case STARTED: + break; + case PRE_PROCESSING: + wfNodeState = WorkflowNodeState.INVOKED; + processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING); + break; + case INPUT_DATA_STAGING: + wfNodeState = WorkflowNodeState.INVOKED; + processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING); + break; + case EXECUTING: + wfNodeState = WorkflowNodeState.EXECUTING; + processContext.getWorkflowNode().setState(NodeState.EXECUTING); + break; + case OUTPUT_DATA_STAGING: + wfNodeState = WorkflowNodeState.COMPLETED; + processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING); + break; + case POST_PROCESSING: + wfNodeState = WorkflowNodeState.COMPLETED; + processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING); + break; + case COMPLETED: + wfNodeState = WorkflowNodeState.COMPLETED; + processContext.getWorkflowNode().setState(NodeState.EXECUTED); + break; + case FAILED: + wfNodeState = WorkflowNodeState.FAILED; + processContext.getWorkflowNode().setState(NodeState.FAILED); + break; + case UNKNOWN: + wfNodeState = WorkflowNodeState.UNKNOWN; + break; + case CONFIGURING_WORKSPACE: + wfNodeState = WorkflowNodeState.COMPLETED; + break; + case CANCELED: + case CANCELING: + wfNodeState = WorkflowNodeState.CANCELED; + processContext.getWorkflowNode().setState(NodeState.FAILED); + break; + default: + break; + } + if (wfNodeState != WorkflowNodeState.UNKNOWN) { + try { + updateWorkflowNodeStatus(processContext.getWfNodeDetails(), wfNodeState); + } catch (RegistryException e) { + log.error("Error while updating workflow node status update to the registry. nodeInstanceId :" + + processContext.getWfNodeDetails().getNodeInstanceId() + " status to: " + + processContext.getWfNodeDetails().getWorkflowNodeStatus().toString() , e); + } + } + } + + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java new file mode 100644 index 0000000..7795296 --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java @@ -0,0 +1,183 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core; + +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessageHandler; +import org.apache.airavata.messaging.core.MessagingConstants; +import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher; +import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.airavata.model.messaging.event.TaskIdentifier; +import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent; +import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class WorkflowEnactmentService { + + private static WorkflowEnactmentService workflowEnactmentService; + private final RabbitMQStatusConsumer statusConsumer; + private String consumerId; + private ExecutorService executor; + private Map<String,SimpleWorkflowInterpreter> workflowMap; + + private WorkflowEnactmentService () throws AiravataException { + executor = Executors.newFixedThreadPool(getThreadPoolSize()); + workflowMap = new ConcurrentHashMap<String, SimpleWorkflowInterpreter>(); + statusConsumer = new RabbitMQStatusConsumer(); + consumerId = statusConsumer.listen(new TaskMessageHandler()); + // register the shutdown hook to un-bind status consumer. + Runtime.getRuntime().addShutdownHook(new EnactmentShutDownHook()); + } + + public static WorkflowEnactmentService getInstance() throws AiravataException { + if (workflowEnactmentService == null) { + synchronized (WorkflowEnactmentService.class) { + if (workflowEnactmentService == null) { + workflowEnactmentService = new WorkflowEnactmentService(); + } + } + } + return workflowEnactmentService; + } + + public void submitWorkflow(String experimentId, + String credentialToken, + String gatewayName, + RabbitMQProcessPublisher publisher) throws Exception { + + SimpleWorkflowInterpreter simpleWorkflowInterpreter = new SimpleWorkflowInterpreter( + experimentId, credentialToken,gatewayName, publisher); + workflowMap.put(experimentId, simpleWorkflowInterpreter); + simpleWorkflowInterpreter.launchWorkflow(); + + } + + private int getThreadPoolSize() { + return ServerSettings.getEnactmentThreadPoolSize(); + } + + private class TaskMessageHandler implements MessageHandler { + + @Override + public Map<String, Object> getProperties() { + Map<String, Object> props = new HashMap<String, Object>(); + String gatewayId = "*"; + String experimentId = "*"; + List<String> routingKeys = new ArrayList<String>(); + routingKeys.add(gatewayId); + routingKeys.add(gatewayId + "." + experimentId); + routingKeys.add(gatewayId + "." + experimentId+ ".*"); + routingKeys.add(gatewayId + "." + experimentId+ ".*.*"); + props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys); + return props; + } + + @Override + public void onMessage(MessageContext msgCtx) { + StatusHandler statusHandler = new StatusHandler(msgCtx); + executor.execute(statusHandler); + } + + + } + + private class StatusHandler implements Runnable{ + private final Logger log = LoggerFactory.getLogger(StatusHandler.class); + + private final MessageContext msgCtx; + + public StatusHandler(MessageContext msgCtx) { + this.msgCtx = msgCtx; + } + + @Override + public void run() { + process(); + } + + private void process() { + String message; + SimpleWorkflowInterpreter simpleWorkflowInterpreter; + if (msgCtx.getType() == MessageType.TASK) { + TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent(); + TaskIdentifier taskIdentifier = event.getTaskIdentity(); + simpleWorkflowInterpreter = getInterpreter(taskIdentifier.getExperimentId()); + if (simpleWorkflowInterpreter != null) { + simpleWorkflowInterpreter.handleTaskStatusChangeEvent(event); + } else { + // this happens when Task status messages comes after the Taskoutput messages,as we have worked on + // output changes it is ok to ignore this. + } + message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId(); + log.debug(message); + }else if (msgCtx.getType() == MessageType.TASKOUTPUT) { + TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent(); + TaskIdentifier taskIdentifier = event.getTaskIdentity(); + simpleWorkflowInterpreter = getInterpreter(taskIdentifier.getExperimentId()); + if (simpleWorkflowInterpreter != null) { + simpleWorkflowInterpreter.handleTaskOutputChangeEvent(event); + if (simpleWorkflowInterpreter.isAllDone()) { + workflowMap.remove(taskIdentifier.getExperimentId()); + } + } else { + throw new IllegalArgumentException("Error while processing TaskOutputChangeEvent, " + + "There is no registered workflow for experiment Id : " + taskIdentifier.getExperimentId()); + } + message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId(); + log.debug(message); + } else { + // not interested, ignores + } + } + + private SimpleWorkflowInterpreter getInterpreter(String experimentId){ + return workflowMap.get(experimentId); + } + } + + + private class EnactmentShutDownHook extends Thread { + private final Logger log = LoggerFactory.getLogger(EnactmentShutDownHook.class); + @Override + public void run() { + super.run(); + try { + statusConsumer.stopListen(consumerId); + log.info("Successfully un-binded task status consumer"); + } catch (AiravataException e) { + log.error("Error while un-bind enactment status consumer", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java new file mode 100644 index 0000000..ee89dd9 --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core; + +/** + * All classes implement this WorkflowFactory interface, should be abstract or singleton. + */ +public interface WorkflowFactory { + + public WorkflowParser getWorkflowParser(String experimentId, String credentialToken) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactoryImpl.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactoryImpl.java new file mode 100644 index 0000000..1df2084 --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactoryImpl.java @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.workflow.core.parser.AiravataWorkflowParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; + +/** + * Singleton class, only one instance can exist in runtime. + */ +public class WorkflowFactoryImpl implements WorkflowFactory { + + private static final Logger log = LoggerFactory.getLogger(WorkflowFactoryImpl.class); + + private static WorkflowFactoryImpl workflowFactoryImpl; + + private WorkflowFactoryImpl(){ + + } + + public static WorkflowFactoryImpl getInstance() { + if (workflowFactoryImpl == null) { + synchronized (WorkflowFactory.class) { + if (workflowFactoryImpl == null) { + workflowFactoryImpl = new WorkflowFactoryImpl(); + } + } + } + return workflowFactoryImpl; + } + + + @Override + public WorkflowParser getWorkflowParser(String experimentId, String credentialToken) throws Exception { + WorkflowParser workflowParser = null; + try { + String wfParserClassName = ServerSettings.getWorkflowParser(); + Class<?> aClass = Class.forName(wfParserClassName); + Constructor<?> constructor = aClass.getConstructor(String.class, String.class); + workflowParser = (WorkflowParser) constructor.newInstance(experimentId, credentialToken); + } catch (ApplicationSettingsException e) { + log.info("A custom workflow parser is not defined, Use default Airavata workflow parser"); + } + if (workflowParser == null) { + workflowParser = new AiravataWorkflowParser(experimentId, credentialToken); + } + return workflowParser; + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java new file mode 100644 index 0000000..8d284dd --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core; + +import org.apache.airavata.workflow.core.dag.nodes.WorkflowInputNode; + +import java.util.List; + +public interface WorkflowParser { + + public List<WorkflowInputNode> parse() throws Exception; + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java new file mode 100644 index 0000000..91118cc --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core.dag.edge; + +import org.apache.airavata.workflow.core.dag.port.InPort; +import org.apache.airavata.workflow.core.dag.port.OutPort; + + +public class DirectedEdge implements Edge { + + private InPort inPort; + private OutPort outPort; + + @Override + public InPort getToPort() { + return inPort; + } + + @Override + public void setToPort(InPort inPort) { + this.inPort = inPort; + } + + @Override + public OutPort getFromPort() { + return outPort; + } + + @Override + public void setFromPort(OutPort outPort) { + this.outPort = outPort; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java new file mode 100644 index 0000000..ee11371 --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core.dag.edge; + +import org.apache.airavata.workflow.core.dag.port.InPort; +import org.apache.airavata.workflow.core.dag.port.OutPort; + +/** + * Edge is a link to one node to another, basically edge should have outPort of a workflow node , + * which is starting point and inPort of a workflow node, which is end point of the edge. + */ + +public interface Edge { + + public InPort getToPort(); + + public void setToPort(InPort inPort); + + public OutPort getFromPort(); + + public void setFromPort(OutPort outPort); + + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNode.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNode.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNode.java new file mode 100644 index 0000000..d775bf4 --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNode.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core.dag.nodes; + +import org.apache.airavata.workflow.core.dag.port.InPort; +import org.apache.airavata.workflow.core.dag.port.OutPort; + +import java.util.List; + +public interface ApplicationNode extends WorkflowNode { + + public String getApplicationId(); + + public void addInPort(InPort inPort); + + public List<InPort> getInputPorts(); + + public void addOutPort(OutPort outPort); + + public List<OutPort> getOutputPorts(); + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNodeImpl.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNodeImpl.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNodeImpl.java new file mode 100644 index 0000000..ad7bd63 --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNodeImpl.java @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core.dag.nodes; + +import org.apache.airavata.workflow.core.dag.port.InPort; +import org.apache.airavata.workflow.core.dag.port.OutPort; + +import java.util.ArrayList; +import java.util.List; + +public class ApplicationNodeImpl implements ApplicationNode { + + private final String nodeId; + private NodeState myState = NodeState.WAITING; + private String applicationId; + private List<InPort> inPorts = new ArrayList<InPort>(); + private List<OutPort> outPorts = new ArrayList<OutPort>(); + private String applicationName; + +// public ApplicationNodeImpl(String nodeId) { +// this(nodeId, null); +// } +// +// public ApplicationNodeImpl(String nodeId, String applicationId) { +// this(nodeId, null, applicationId); +// } + + public ApplicationNodeImpl(String nodeId, String applicationName, String applicationId) { + this.nodeId = nodeId; + this.applicationName = applicationName; + this.applicationId = applicationId; + } + + @Override + public String getId() { + return this.nodeId; + } + + @Override + public String getName() { + return applicationName; + } + + @Override + public NodeType getType() { + return NodeType.APPLICATION; + } + + @Override + public NodeState getState() { + return myState; + } + + @Override + public void setState(NodeState newState) { + if (newState.getLevel() > myState.getLevel()) { + myState = newState; + } else { + throw new IllegalStateException("Node state can't be reversed. currentState : " + myState.toString() + " , newState " + newState.toString()); + } + } + + @Override + public boolean isReady() { + for (InPort inPort : getInputPorts()) { + if (!inPort.isReady()) { + return false; + } + } + return true; + } + + @Override + public String getApplicationId() { + return this.applicationId; + } + + @Override + public void addInPort(InPort inPort) { + this.inPorts.add(inPort); + } + + @Override + public List<InPort> getInputPorts() { + return this.inPorts; + } + + @Override + public void addOutPort(OutPort outPort) { + this.outPorts.add(outPort); + } + + @Override + public List<OutPort> getOutputPorts() { + return this.outPorts; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeState.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeState.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeState.java new file mode 100644 index 0000000..df2c87a --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeState.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core.dag.nodes; + +public enum NodeState { + WAITING(0), // waiting on inputs + READY(1), // all inputs are available and ready to execute + QUEUED(2), // + PRE_PROCESSING(3), // + EXECUTING(4), // task has been submitted , not yet finish + EXECUTED(5), // task executed + POST_PROCESSING(6), // + FAILED(7), + COMPLETE(8); // all works done + + private int level; + + NodeState(int level) { + this.level = level; + } + + public int getLevel() { + return level; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeType.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeType.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeType.java new file mode 100644 index 0000000..04e4c07 --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeType.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core.dag.nodes; + +public enum NodeType { + APPLICATION, + WORKFLOW_INPUT, + WORKFLOW_OUTPUT +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNode.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNode.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNode.java new file mode 100644 index 0000000..83bcacf --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNode.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core.dag.nodes; + +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.workflow.core.dag.port.OutPort; + +public interface WorkflowInputNode extends WorkflowNode { + + public InputDataObjectType getInputObject(); + + public void setInputObject(InputDataObjectType inputObject); + + public OutPort getOutPort(); + + public void setOutPort(OutPort outPort); + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNodeImpl.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNodeImpl.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNodeImpl.java new file mode 100644 index 0000000..2364b03 --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNodeImpl.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.airavata.workflow.core.dag.nodes; + +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.workflow.core.dag.port.OutPort; + +public class WorkflowInputNodeImpl implements WorkflowInputNode { + + private NodeState myState = NodeState.READY; + private final String nodeId; + private String nodeName; + private OutPort outPort; + private InputDataObjectType inputDataObjectType; + private String name; + + public WorkflowInputNodeImpl(String nodeId) { + this(nodeId, null); + } + + public WorkflowInputNodeImpl(String nodeId, String nodeName) { + this.nodeId = nodeId; + this.nodeName = nodeName; + } + + @Override + public String getId() { + return this.nodeId; + } + + @Override + public String getName() { + return this.nodeName; + } + + @Override + public NodeType getType() { + return NodeType.WORKFLOW_INPUT; + } + + @Override + public NodeState getState() { + return myState; + } + + @Override + public void setState(NodeState newState) { + if (newState.getLevel() > myState.getLevel()) { + myState = newState; + } else { + throw new IllegalStateException("Node state can't be reversed. currentState : " + myState.toString() + " , newState " + newState.toString()); + } + } + + @Override + public boolean isReady() { + return (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals("")) + || !inputDataObjectType.isIsRequired(); + } + + @Override + public InputDataObjectType getInputObject() { + return this.inputDataObjectType; + } + + @Override + public void setInputObject(InputDataObjectType inputObject) { + this.inputDataObjectType = inputObject; + } + + @Override + public OutPort getOutPort() { + return this.outPort; + } + + @Override + public void setOutPort(OutPort outPort) { + this.outPort = outPort; + } + + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowNode.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowNode.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowNode.java new file mode 100644 index 0000000..e86a740 --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowNode.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core.dag.nodes; + +public interface WorkflowNode { + + public String getId(); + + public String getName(); + + public NodeType getType(); + + public NodeState getState(); + + public void setState(NodeState newState); + + public boolean isReady(); + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNode.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNode.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNode.java new file mode 100644 index 0000000..340ac30 --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNode.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core.dag.nodes; + +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.workflow.core.dag.port.InPort; + +public interface WorkflowOutputNode extends WorkflowNode { + + public OutputDataObjectType getOutputObject(); + + public void setOutputObject(OutputDataObjectType outputObject); + + public InPort getInPort(); + + public void setInPort(InPort inPort); + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNodeImpl.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNodeImpl.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNodeImpl.java new file mode 100644 index 0000000..294109f --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNodeImpl.java @@ -0,0 +1,100 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core.dag.nodes; + +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.workflow.core.dag.port.InPort; + +public class WorkflowOutputNodeImpl implements WorkflowOutputNode { + + private NodeState myState = NodeState.WAITING; + private final String nodeId; + private String nodeName; + private OutputDataObjectType outputDataObjectType; + private InPort inPort; + + public WorkflowOutputNodeImpl(String nodeId) { + this(nodeId, null); + } + + public WorkflowOutputNodeImpl(String nodeId, String nodeName) { + this.nodeId = nodeId; + this.nodeName = nodeName; + } + + @Override + public String getId() { + return this.nodeId; + } + + @Override + public String getName() { + return this.nodeName; + } + + @Override + public NodeType getType() { + return NodeType.WORKFLOW_OUTPUT; + } + + @Override + public NodeState getState() { + return myState; + } + + @Override + public void setState(NodeState newState) { + if (newState.getLevel() > myState.getLevel()) { + myState = newState; + } else { + throw new IllegalStateException("Node state can't be reversed. currentState : " + myState.toString() + " , newState " + newState.toString()); + } + } + + @Override + public boolean isReady() { + return !(inPort.getInputObject() == null || inPort.getInputObject().getValue() == null + || inPort.getInputObject().getValue().equals("")); + } + + @Override + public OutputDataObjectType getOutputObject() { + return this.outputDataObjectType; + } + + @Override + public void setOutputObject(OutputDataObjectType outputObject) { + this.outputDataObjectType = outputObject; + } + + @Override + public InPort getInPort() { + return this.inPort; + } + + @Override + public void setInPort(InPort inPort) { + this.inPort = inPort; + } + +} + http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InPort.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InPort.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InPort.java new file mode 100644 index 0000000..fb8dfa7 --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InPort.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core.dag.port; + +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.workflow.core.dag.edge.Edge; + +public interface InPort extends Port { + + public void setInputObject(InputDataObjectType inputObject); + + public InputDataObjectType getInputObject(); + + public Edge getEdge(); + + public void addEdge(Edge edge); + + public String getDefaultValue(); + + public void setDefaultValue(String defaultValue); + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InputPortIml.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InputPortIml.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InputPortIml.java new file mode 100644 index 0000000..43a41be --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InputPortIml.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.airavata.workflow.core.dag.port; + +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.workflow.core.dag.edge.Edge; +import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode; + +public class InputPortIml implements InPort { + + private InputDataObjectType inputDataObjectType; + private boolean ready = false; + private String portId; + private Edge edge; + private WorkflowNode node; + private String defaultValue; + + public InputPortIml(String portId) { + this.portId = portId; + } + + @Override + public void setInputObject(InputDataObjectType inputObject) { + this.inputDataObjectType = inputObject; + ready = (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals("")) + || !inputDataObjectType.isIsRequired(); + } + + @Override + public InputDataObjectType getInputObject() { + return this.inputDataObjectType; + } + + @Override + public Edge getEdge() { + return this.edge; + } + + @Override + public void addEdge(Edge edge) { + this.edge = edge; + } + + @Override + public String getDefaultValue() { + return defaultValue; + } + + public void setDefaultValue(String defaultValue) { + this.defaultValue = defaultValue; + } + + @Override + public boolean isReady() { + return getInputObject() != null && (!inputDataObjectType.isIsRequired() || + (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals(""))); + } + + @Override + public WorkflowNode getNode() { + return this.node; + } + + @Override + public void setNode(WorkflowNode workflowNode) { + this.node = workflowNode; + } + + @Override + public String getId() { + return this.portId; + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java new file mode 100644 index 0000000..d9aa004 --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core.dag.port; + +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.workflow.core.dag.edge.Edge; + +import java.util.List; + +public interface OutPort extends Port { + + public void setOutputObject(OutputDataObjectType outputObject); + + public OutputDataObjectType getOutputObject(); + + public List<Edge> getOutEdges(); + + public void addEdge(Edge edge); + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java new file mode 100644 index 0000000..9c3f86a --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core.dag.port; + +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.workflow.core.dag.edge.Edge; +import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode; + +import java.util.ArrayList; +import java.util.List; + +public class OutPortImpl implements OutPort { + + private OutputDataObjectType outputDataObjectType; + private List<Edge> outEdges = new ArrayList<Edge>(); + private boolean isSatisfy = false; + private String portId; + private WorkflowNode node; + + public OutPortImpl(String portId) { + this.portId = portId; + } + + @Override + public void setOutputObject(OutputDataObjectType outputObject) { + this.outputDataObjectType = outputObject; + } + + @Override + public OutputDataObjectType getOutputObject() { + return this.outputDataObjectType; + } + + @Override + public List<Edge> getOutEdges() { + return this.outEdges; + } + + @Override + public void addEdge(Edge edge) { + this.outEdges.add(edge); + } + + @Override + public boolean isReady() { + return this.outputDataObjectType.getValue() != null + && !this.outputDataObjectType.getValue().equals(""); + } + + @Override + public WorkflowNode getNode() { + return this.node; + } + + @Override + public void setNode(WorkflowNode workflowNode) { + this.node = workflowNode; + } + + @Override + public String getId() { + return portId; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/Port.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/Port.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/Port.java new file mode 100644 index 0000000..e3756cf --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/Port.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core.dag.port; + +import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode; + +public interface Port { + + public boolean isReady(); + + public WorkflowNode getNode(); + + public void setNode(WorkflowNode workflowNode); + + public String getId(); + +}
