Fixed some compilation issues with workflow-core model
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b65079a7 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b65079a7 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b65079a7 Branch: refs/heads/master Commit: b65079a75e8a67d4c48832bcd7a168a0aa3b7d99 Parents: b5d568d Author: Shameera Rathnayaka <[email protected]> Authored: Mon Feb 1 12:01:48 2016 -0500 Committer: Shameera Rathnayaka <[email protected]> Committed: Mon Feb 1 12:01:48 2016 -0500 ---------------------------------------------------------------------- .../airavata/workflow/core/ProcessContext.java | 62 ----------- .../core/SimpleWorkflowInterpreter.java | 111 +++++++++---------- .../airavata/workflow/core/WorkflowContext.java | 60 ++++++++++ .../workflow/core/WorkflowEnactmentService.java | 22 ++-- .../core/parser/JsonWorkflowParser.java | 21 ++++ 5 files changed, 140 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/b65079a7/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/ProcessContext.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/ProcessContext.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/ProcessContext.java deleted file mode 100644 index cf78f9e..0000000 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/ProcessContext.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.workflow.core; - -import org.apache.airavata.model.experiment.TaskDetails; -import org.apache.airavata.model.experiment.WorkflowNodeDetails; -import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode; - -public class ProcessContext { - private WorkflowNode workflowNode; - private WorkflowNodeDetails wfNodeDetails; - private TaskDetails taskDetails; - - public ProcessContext(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) { - this.workflowNode = workflowNode; - this.wfNodeDetails = wfNodeDetails; - this.taskDetails = taskDetails; - } - - public WorkflowNode getWorkflowNode() { - return workflowNode; - } - - public void setWorkflowNode(WorkflowNode workflowNode) { - this.workflowNode = workflowNode; - } - - public WorkflowNodeDetails getWfNodeDetails() { - return wfNodeDetails; - } - - public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) { - this.wfNodeDetails = wfNodeDetails; - } - - public TaskDetails getTaskDetails() { - return taskDetails; - } - - public void setTaskDetails(TaskDetails taskDetails) { - this.taskDetails = taskDetails; - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/b65079a7/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java index 5d00354..defbad4 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java @@ -24,23 +24,15 @@ package org.apache.airavata.workflow.core; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.messaging.core.MessageContext; -import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher; +import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher; import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; -import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; -import org.apache.airavata.model.messaging.event.MessageType; -import org.apache.airavata.model.messaging.event.ProcessSubmitEvent; -import org.apache.airavata.model.messaging.event.TaskIdentifier; -import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent; -import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; -import org.apache.airavata.model.util.ExperimentModelUtil; -import org.apache.airavata.model.experiment.ExecutionUnit; +import org.apache.airavata.model.application.io.OutputDataObjectType; import org.apache.airavata.model.experiment.ExperimentModel; -import org.apache.airavata.model.experiment.TaskDetails; -import org.apache.airavata.model.experiment.TaskState; -import org.apache.airavata.model.experiment.WorkflowNodeDetails; -import org.apache.airavata.model.experiment.WorkflowNodeState; -import org.apache.airavata.model.experiment.WorkflowNodeStatus; +import org.apache.airavata.model.messaging.event.*; +import org.apache.airavata.model.status.ProcessState; +import org.apache.airavata.model.util.ExperimentModelUtil; import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; +import org.apache.airavata.registry.core.experiment.catalog.model.Experiment; import org.apache.airavata.registry.cpi.*; import org.apache.airavata.workflow.core.dag.edge.Edge; import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode; @@ -69,7 +61,7 @@ class SimpleWorkflowInterpreter{ private static final Logger log = LoggerFactory.getLogger(SimpleWorkflowInterpreter.class); private List<WorkflowInputNode> workflowInputNodes; - private Experiment experiment; + private ExperimentModel experiment; private String credentialToken; @@ -77,23 +69,23 @@ class SimpleWorkflowInterpreter{ private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<String, WorkflowNode>(); private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>(); - private Map<String, ProcessContext> processingQueue = new ConcurrentHashMap<String, ProcessContext>(); - private Map<String, ProcessContext> completeList = new HashMap<String, ProcessContext>(); + private Map<String, WorkflowContext> processingQueue = new ConcurrentHashMap<String, WorkflowContext>(); + private Map<String, WorkflowContext> completeList = new HashMap<String, WorkflowContext>(); private Registry registry; private List<WorkflowOutputNode> completeWorkflowOutputs = new ArrayList<WorkflowOutputNode>(); - private RabbitMQProcessPublisher publisher; + private RabbitMQProcessLaunchPublisher publisher; private RabbitMQStatusConsumer statusConsumer; private String consumerId; private boolean continueWorkflow = true; - public SimpleWorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessPublisher publisher) throws RegistryException { + public SimpleWorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessLaunchPublisher publisher) throws RegistryException { this.gatewayName = gatewayName; setExperiment(experimentId); this.credentialToken = credentialToken; this.publisher = publisher; } - public SimpleWorkflowInterpreter(Experiment experiment, String credentialStoreToken, String gatewayName, RabbitMQProcessPublisher publisher) { + public SimpleWorkflowInterpreter(ExperimentModel experiment, String credentialStoreToken, String gatewayName, RabbitMQProcessLaunchPublisher publisher) { this.gatewayName = gatewayName; this.experiment = experiment; this.credentialToken = credentialStoreToken; @@ -106,7 +98,7 @@ class SimpleWorkflowInterpreter{ */ void launchWorkflow() throws Exception { WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance(); - WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken); + WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentId(), credentialToken); log.debug("Initialized workflow parser"); setWorkflowInputNodes(workflowParser.parse()); log.debug("Parsed the workflow and got the workflow input nodes"); @@ -144,8 +136,8 @@ class SimpleWorkflowInterpreter{ } WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode); TaskDetails process = getProcess(workflowNodeDetails); - ProcessContext processContext = new ProcessContext(readyNode, workflowNodeDetails, process); - addToProcessingQueue(processContext); + WorkflowContext workflowContext = new WorkflowContext(readyNode, workflowNodeDetails, process); + addToProcessingQueue(workflowContext); publishToProcessQueue(process); } if (processingQueue.isEmpty()) { @@ -277,16 +269,16 @@ class SimpleWorkflowInterpreter{ /** * First remove the node from ready list and then add the WfNodeContainer to the process queue. * Note that underline data structure of the process queue is a Map. - * @param processContext - has both workflow and correspond workflowNodeDetails and TaskDetails + * @param workflowContext - has both workflow and correspond workflowNodeDetails and TaskDetails */ - private synchronized void addToProcessingQueue(ProcessContext processContext) { - readyList.remove(processContext.getWorkflowNode().getId()); - processingQueue.put(processContext.getTaskDetails().getTaskID(), processContext); + private synchronized void addToProcessingQueue(WorkflowContext workflowContext) { + readyList.remove(workflowContext.getWorkflowNode().getId()); + processingQueue.put(workflowContext.getTaskDetails().getTaskID(), workflowContext); } - private synchronized void addToCompleteQueue(ProcessContext processContext) { - processingQueue.remove(processContext.getTaskDetails().getTaskID()); - completeList.put(processContext.getTaskDetails().getTaskID(), processContext); + private synchronized void addToCompleteQueue(WorkflowContext workflowContext) { + processingQueue.remove(workflowContext.getTaskDetails().getTaskID()); + completeList.put(workflowContext.getTaskDetails().getTaskID(), workflowContext); } @@ -309,10 +301,10 @@ class SimpleWorkflowInterpreter{ String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId(); log.debug("Task Output changed event received for workflow node : " + taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId); - ProcessContext processContext = processingQueue.get(taskId); + WorkflowContext workflowContext = processingQueue.get(taskId); Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>(); - if (processContext != null) { - WorkflowNode workflowNode = processContext.getWorkflowNode(); + if (workflowContext != null) { + WorkflowNode workflowNode = workflowContext.getWorkflowNode(); if (workflowNode instanceof ApplicationNode) { ApplicationNode applicationNode = (ApplicationNode) workflowNode; // Workflow node can have one to many output ports and each output port can have one to many links @@ -331,7 +323,7 @@ class SimpleWorkflowInterpreter{ } } } - addToCompleteQueue(processContext); + addToCompleteQueue(workflowContext); log.debug("removed task from processing queue : " + taskId); try { processReadyList(); @@ -342,67 +334,64 @@ class SimpleWorkflowInterpreter{ } } - void handleTaskStatusChangeEvent(TaskStatusChangeEvent taskStatusChangeEvent) { - TaskState taskState = taskStatusChangeEvent.getState(); - TaskIdentifier taskIdentity = taskStatusChangeEvent.getTaskIdentity(); - String taskId = taskIdentity.getTaskId(); - ProcessContext processContext = processingQueue.get(taskId); - if (processContext != null) { + void handleProcessStatusChangeEvent(ProcessStatusChangeEvent processStatusChangeEvent) { + ProcessState processState = processStatusChangeEvent.getState(); + ProcessIdentifier processIdentity = processStatusChangeEvent.getProcessIdentity(); + String processId = processIdentity.getProcessId(); + WorkflowContext workflowContext = processingQueue.get(processId); + if (workflowContext != null) { WorkflowNodeState wfNodeState = WorkflowNodeState.INVOKED; - switch (taskState) { - case WAITING: - break; + switch (processState) { + case CREATED: + case VALIDATED: case STARTED: break; + case CONFIGURING_WORKSPACE: + wfNodeState = WorkflowNodeState.COMPLETED; + break; case PRE_PROCESSING: wfNodeState = WorkflowNodeState.INVOKED; - processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING); + workflowContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING); break; case INPUT_DATA_STAGING: wfNodeState = WorkflowNodeState.INVOKED; - processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING); + workflowContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING); break; case EXECUTING: wfNodeState = WorkflowNodeState.EXECUTING; - processContext.getWorkflowNode().setState(NodeState.EXECUTING); + workflowContext.getWorkflowNode().setState(NodeState.EXECUTING); break; case OUTPUT_DATA_STAGING: wfNodeState = WorkflowNodeState.COMPLETED; - processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING); + workflowContext.getWorkflowNode().setState(NodeState.POST_PROCESSING); break; case POST_PROCESSING: wfNodeState = WorkflowNodeState.COMPLETED; - processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING); + workflowContext.getWorkflowNode().setState(NodeState.POST_PROCESSING); break; case COMPLETED: wfNodeState = WorkflowNodeState.COMPLETED; - processContext.getWorkflowNode().setState(NodeState.EXECUTED); + workflowContext.getWorkflowNode().setState(NodeState.EXECUTED); break; case FAILED: wfNodeState = WorkflowNodeState.FAILED; - processContext.getWorkflowNode().setState(NodeState.FAILED); - break; - case UNKNOWN: - wfNodeState = WorkflowNodeState.UNKNOWN; - break; - case CONFIGURING_WORKSPACE: - wfNodeState = WorkflowNodeState.COMPLETED; + workflowContext.getWorkflowNode().setState(NodeState.FAILED); break; case CANCELED: - case CANCELING: + case CANCELLING: wfNodeState = WorkflowNodeState.CANCELED; - processContext.getWorkflowNode().setState(NodeState.FAILED); + workflowContext.getWorkflowNode().setState(NodeState.FAILED); break; default: break; } if (wfNodeState != WorkflowNodeState.UNKNOWN) { try { - updateWorkflowNodeStatus(processContext.getWfNodeDetails(), wfNodeState); + updateWorkflowNodeStatus(workflowContext.getWfNodeDetails(), wfNodeState); } catch (RegistryException e) { log.error("Error while updating workflow node status update to the registry. nodeInstanceId :" - + processContext.getWfNodeDetails().getNodeInstanceId() + " status to: " - + processContext.getWfNodeDetails().getWorkflowNodeStatus().toString() , e); + + workflowContext.getWfNodeDetails().getNodeInstanceId() + " status to: " + + workflowContext.getWfNodeDetails().getWorkflowNodeStatus().toString() , e); } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/b65079a7/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowContext.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowContext.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowContext.java new file mode 100644 index 0000000..47bd9ca --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowContext.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core; + +import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode; + +public class WorkflowContext { + private WorkflowNode workflowNode; + private WorkflowNodeDetails wfNodeDetails; + private TaskDetails taskDetails; + + public WorkflowContext(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) { + this.workflowNode = workflowNode; + this.wfNodeDetails = wfNodeDetails; + this.taskDetails = taskDetails; + } + + public WorkflowNode getWorkflowNode() { + return workflowNode; + } + + public void setWorkflowNode(WorkflowNode workflowNode) { + this.workflowNode = workflowNode; + } + + public WorkflowNodeDetails getWfNodeDetails() { + return wfNodeDetails; + } + + public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) { + this.wfNodeDetails = wfNodeDetails; + } + + public TaskDetails getTaskDetails() { + return taskDetails; + } + + public void setTaskDetails(TaskDetails taskDetails) { + this.taskDetails = taskDetails; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/b65079a7/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java index 7795296..8ca0706 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java @@ -26,12 +26,9 @@ import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.messaging.core.MessageHandler; import org.apache.airavata.messaging.core.MessagingConstants; -import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher; +import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher; import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; -import org.apache.airavata.model.messaging.event.MessageType; -import org.apache.airavata.model.messaging.event.TaskIdentifier; -import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent; -import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; +import org.apache.airavata.model.messaging.event.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +71,7 @@ public class WorkflowEnactmentService { public void submitWorkflow(String experimentId, String credentialToken, String gatewayName, - RabbitMQProcessPublisher publisher) throws Exception { + RabbitMQProcessLaunchPublisher publisher) throws Exception { SimpleWorkflowInterpreter simpleWorkflowInterpreter = new SimpleWorkflowInterpreter( experimentId, credentialToken,gatewayName, publisher); @@ -129,18 +126,17 @@ public class WorkflowEnactmentService { private void process() { String message; SimpleWorkflowInterpreter simpleWorkflowInterpreter; - if (msgCtx.getType() == MessageType.TASK) { - TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent(); - TaskIdentifier taskIdentifier = event.getTaskIdentity(); - simpleWorkflowInterpreter = getInterpreter(taskIdentifier.getExperimentId()); + if (msgCtx.getType() == MessageType.PROCESS) { + ProcessStatusChangeEvent event = ((ProcessStatusChangeEvent) msgCtx.getEvent()); + ProcessIdentifier processIdentity = event.getProcessIdentity(); + simpleWorkflowInterpreter = getInterpreter(processIdentity.getExperimentId()); if (simpleWorkflowInterpreter != null) { - simpleWorkflowInterpreter.handleTaskStatusChangeEvent(event); + simpleWorkflowInterpreter.handleProcessStatusChangeEvent(event); } else { // this happens when Task status messages comes after the Taskoutput messages,as we have worked on // output changes it is ok to ignore this. } - message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId(); - log.debug(message); + }else if (msgCtx.getType() == MessageType.TASKOUTPUT) { TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent(); TaskIdentifier taskIdentifier = event.getTaskIdentity(); http://git-wip-us.apache.org/repos/asf/airavata/blob/b65079a7/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java new file mode 100644 index 0000000..6c839e7 --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java @@ -0,0 +1,21 @@ +package org.apache.airavata.workflow.core.parser; + +import org.apache.airavata.workflow.core.WorkflowParser; +import org.apache.airavata.workflow.core.dag.nodes.WorkflowInputNode; + +import java.util.List; + +/** + * Created by syodage on 1/27/16. + */ +public class JsonWorkflowParser implements WorkflowParser { + + public JsonWorkflowParser(String workflowDescription) { + + } + + @Override + public List<WorkflowInputNode> parse() throws Exception { + return null; + } +}
