Fixed AIRAVATA-1620.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/774b092d Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/774b092d Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/774b092d Branch: refs/heads/master Commit: 774b092d31e41919b39ca3ce9f1edd5af1c30669 Parents: 44d89ee Author: shamrath <[email protected]> Authored: Fri Mar 6 16:18:44 2015 -0500 Committer: shamrath <[email protected]> Committed: Fri Mar 6 16:18:44 2015 -0500 ---------------------------------------------------------------------- .../server/OrchestratorServerHandler.java | 31 +- .../engine/SimpleWorkflowInterpreter.java | 280 ++++++++----------- .../engine/WorkflowEnactmentService.java | 129 ++++++++- 3 files changed, 259 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/774b092d/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index d168c26..fe306d7 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -52,11 +52,20 @@ import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; import org.apache.airavata.model.messaging.event.MessageType; import org.apache.airavata.model.messaging.event.ProcessSubmitEvent; import org.apache.airavata.model.util.ExecutionType; -import org.apache.airavata.model.workspace.experiment.*; +import org.apache.airavata.model.workspace.experiment.Experiment; +import org.apache.airavata.model.workspace.experiment.ExperimentState; +import org.apache.airavata.model.workspace.experiment.ExperimentStatus; +import org.apache.airavata.model.workspace.experiment.TaskDetails; +import org.apache.airavata.model.workspace.experiment.TaskState; +import org.apache.airavata.model.workspace.experiment.TaskStatus; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeState; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus; import org.apache.airavata.orchestrator.core.exception.OrchestratorException; import org.apache.airavata.orchestrator.cpi.OrchestratorService; import org.apache.airavata.orchestrator.cpi.impl.SimpleOrchestratorImpl; import org.apache.airavata.orchestrator.cpi.orchestrator_cpi_serviceConstants; +import org.apache.airavata.orchestrator.util.DataModelUtils; import org.apache.airavata.orchestrator.util.OrchestratorServerThreadPoolExecutor; import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; import org.apache.airavata.registry.cpi.Registry; @@ -64,17 +73,25 @@ import org.apache.airavata.registry.cpi.RegistryException; import org.apache.airavata.registry.cpi.RegistryModelType; import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.TaskDetailConstants; import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants; -import org.apache.airavata.orchestrator.util.DataModelUtils; -import org.apache.airavata.simple.workflow.engine.SimpleWorkflowInterpreter; import org.apache.airavata.simple.workflow.engine.WorkflowEnactmentService; import org.apache.thrift.TBase; import org.apache.thrift.TException; -import org.apache.zookeeper.*; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.io.File; import java.io.IOException; -import java.util.*; +import java.util.Arrays; +import java.util.Calendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; public class OrchestratorServerHandler implements OrchestratorService.Iface, Watcher { @@ -656,10 +673,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, try { WorkflowEnactmentService.getInstance(). submitWorkflow(experimentId, airavataCredStoreToken, getGatewayName(), getRabbitMQProcessPublisher()); - } catch (RegistryException e) { - log.error("Error while launching workflow", e); } catch (Exception e) { - log.error("Error while initializing rabbit mq process publisher"); + log.error("Error while launching workflow", e); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/774b092d/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java index a052e5c..ee7ff6b 100644 --- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java @@ -24,8 +24,6 @@ package org.apache.airavata.simple.workflow.engine; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.messaging.core.MessageContext; -import org.apache.airavata.messaging.core.MessageHandler; -import org.apache.airavata.messaging.core.MessagingConstants; import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher; import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; @@ -66,7 +64,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -public class SimpleWorkflowInterpreter implements Runnable{ +/** + * Package-Private class + */ +class SimpleWorkflowInterpreter{ private static final Logger log = LoggerFactory.getLogger(SimpleWorkflowInterpreter.class); private List<WorkflowInputNode> workflowInputNodes; @@ -102,8 +103,11 @@ public class SimpleWorkflowInterpreter implements Runnable{ this.publisher = publisher; } - - public void launchWorkflow() throws Exception { + /** + * Package-Private method. + * @throws Exception + */ + void launchWorkflow() throws Exception { WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance(); WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken); log.debug("Initialized workflow parser"); @@ -111,15 +115,16 @@ public class SimpleWorkflowInterpreter implements Runnable{ log.debug("Parsed the workflow and got the workflow input nodes"); // process workflow input nodes processWorkflowInputNodes(getWorkflowInputNodes()); - // initialize the rabbitmq status consumer - statusConsumer = new RabbitMQStatusConsumer(); - consumerId = statusConsumer.listen(new TaskMessageHandler()); - processReadyList(); } // try to remove synchronization tag - private synchronized void processReadyList() throws RegistryException, AiravataException { + /** + * Package-Private method. + * @throws RegistryException + * @throws AiravataException + */ + void processReadyList() throws RegistryException, AiravataException { for (WorkflowNode readyNode : readyList.values()) { if (readyNode instanceof WorkflowOutputNode) { WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode; @@ -232,10 +237,11 @@ public class SimpleWorkflowInterpreter implements Runnable{ } /** + * Package-Private method. * Remove the workflow node from waiting queue and add it to the ready queue. * @param workflowNode - Workflow Node */ - private synchronized void addToReadyQueue(WorkflowNode workflowNode) { + synchronized void addToReadyQueue(WorkflowNode workflowNode) { waitingList.remove(workflowNode.getId()); readyList.put(workflowNode.getId(), workflowNode); } @@ -265,31 +271,8 @@ public class SimpleWorkflowInterpreter implements Runnable{ readyList.remove(wfOutputNode.getId()); } - @Override - public void run() { - try { - log.debug("Launching workflow"); - launchWorkflow(); - while (continueWorkflow && !(waitingList.isEmpty() && readyList.isEmpty())) { -// processReadyList(); - Thread.sleep(1000); - } - if (continueWorkflow) { - log.info("Successfully launched workflow for experiment : " + getExperiment().getExperimentID()); - } else if (!(waitingList.isEmpty() || readyList.isEmpty())) { - log.error("Workflow couldn't execute all workflow nodes due to an error"); - } - } catch (Exception e) { - log.error("Error launching workflow", e); - } finally { - try { - statusConsumer.stopListen(consumerId); - log.info("Successfully un-bind status consumer for experiment " + getExperiment().getExperimentID()); - } catch (AiravataException e) { - log.error("Error while un-binding status consumer: " + consumerId + " for experiment " - + getExperiment().getExperimentID()); - } - } + boolean isAllDone() { + return !continueWorkflow || (waitingList.isEmpty() && readyList.isEmpty()); } private void setExperiment(String experimentId) throws RegistryException { @@ -297,147 +280,108 @@ public class SimpleWorkflowInterpreter implements Runnable{ log.debug("Retrieve Experiment for experiment id : " + experimentId); } - class TaskMessageHandler implements MessageHandler{ - - @Override - public Map<String, Object> getProperties() { - Map<String, Object> props = new HashMap<String, Object>(); - String gatewayId = "*"; - String experimentId = getExperiment().getExperimentID(); - List<String> routingKeys = new ArrayList<String>(); -// routingKeys.add(gatewayName+ "." + getExperiment().getExperimentID() + ".*"); - routingKeys.add(gatewayId); - routingKeys.add(gatewayId + "." + experimentId); - routingKeys.add(gatewayId + "." + experimentId+ ".*"); - routingKeys.add(gatewayId + "." + experimentId+ ".*.*"); - props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys); - return props; - } - - @Override - public void onMessage(MessageContext msgCtx) { - String message; - if (msgCtx.getType() == MessageType.TASK) { - TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent(); - TaskIdentifier taskIdentifier = event.getTaskIdentity(); - handleTaskStatusChangeEvent(event); - message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId(); - log.debug(message); - }else if (msgCtx.getType() == MessageType.TASKOUTPUT) { - TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent(); - TaskIdentifier taskIdentifier = event.getTaskIdentity(); - handleTaskOutputChangeEvent(event); - message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId(); - log.debug(message); - } else { - // not interesting, ignores - } - } - - private void handleTaskOutputChangeEvent(TaskOutputChangeEvent taskOutputChangeEvent) { - - String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId(); - log.debug("Task Output changed event received for workflow node : " + - taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId); - ProcessContext processContext = processingQueue.get(taskId); - Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>(); - if (processContext != null) { - WorkflowNode workflowNode = processContext.getWorkflowNode(); - if (workflowNode instanceof ApplicationNode) { - ApplicationNode applicationNode = (ApplicationNode) workflowNode; - // Workflow node can have one to many output ports and each output port can have one to many links - for (OutPort outPort : applicationNode.getOutputPorts()) { - for (OutputDataObjectType outputDataObjectType : taskOutputChangeEvent.getOutput()) { - if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) { - outPort.getOutputObject().setValue(outputDataObjectType.getValue()); - break; - } + synchronized void handleTaskOutputChangeEvent(TaskOutputChangeEvent taskOutputChangeEvent) { + + String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId(); + log.debug("Task Output changed event received for workflow node : " + + taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId); + ProcessContext processContext = processingQueue.get(taskId); + Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>(); + if (processContext != null) { + WorkflowNode workflowNode = processContext.getWorkflowNode(); + if (workflowNode instanceof ApplicationNode) { + ApplicationNode applicationNode = (ApplicationNode) workflowNode; + // Workflow node can have one to many output ports and each output port can have one to many links + for (OutPort outPort : applicationNode.getOutputPorts()) { + for (OutputDataObjectType outputDataObjectType : taskOutputChangeEvent.getOutput()) { + if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) { + outPort.getOutputObject().setValue(outputDataObjectType.getValue()); + break; } - for (Edge edge : outPort.getOutEdges()) { - edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue()); - if (edge.getToPort().getNode().isReady()) { - addToReadyQueue(edge.getToPort().getNode()); - } + } + for (Edge edge : outPort.getOutEdges()) { + edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue()); + if (edge.getToPort().getNode().isReady()) { + addToReadyQueue(edge.getToPort().getNode()); } } } - addToCompleteQueue(processContext); - log.debug("removed task from processing queue : " + taskId); - try { - processReadyList(); - } catch (Exception e) { - log.error("Error while processing ready workflow nodes", e); - continueWorkflow = false; - } + } + addToCompleteQueue(processContext); + log.debug("removed task from processing queue : " + taskId); + try { + processReadyList(); + } catch (Exception e) { + log.error("Error while processing ready workflow nodes", e); + continueWorkflow = false; } } + } - private void handleTaskStatusChangeEvent(TaskStatusChangeEvent taskStatusChangeEvent) { - TaskState taskState = taskStatusChangeEvent.getState(); - TaskIdentifier taskIdentity = taskStatusChangeEvent.getTaskIdentity(); - String taskId = taskIdentity.getTaskId(); - ProcessContext processContext = processingQueue.get(taskId); - if (processContext != null) { - WorkflowNodeState wfNodeState = WorkflowNodeState.INVOKED; - switch (taskState) { - case WAITING: - break; - case STARTED: - break; - case PRE_PROCESSING: - wfNodeState = WorkflowNodeState.INVOKED; - processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING); - break; - case INPUT_DATA_STAGING: - wfNodeState = WorkflowNodeState.INVOKED; - processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING); - break; - case EXECUTING: - wfNodeState = WorkflowNodeState.EXECUTING; - processContext.getWorkflowNode().setState(NodeState.EXECUTING); - break; - case OUTPUT_DATA_STAGING: - wfNodeState = WorkflowNodeState.COMPLETED; - processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING); - break; - case POST_PROCESSING: - wfNodeState = WorkflowNodeState.COMPLETED; - processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING); - break; - case COMPLETED: - wfNodeState = WorkflowNodeState.COMPLETED; - processContext.getWorkflowNode().setState(NodeState.EXECUTED); - break; - case FAILED: - wfNodeState = WorkflowNodeState.FAILED; - processContext.getWorkflowNode().setState(NodeState.FAILED); - break; - case UNKNOWN: - wfNodeState = WorkflowNodeState.UNKNOWN; - break; - case CONFIGURING_WORKSPACE: - wfNodeState = WorkflowNodeState.COMPLETED; - break; - case CANCELED: - case CANCELING: - wfNodeState = WorkflowNodeState.CANCELED; - processContext.getWorkflowNode().setState(NodeState.FAILED); - break; - default: - break; - } - if (wfNodeState != WorkflowNodeState.UNKNOWN) { - try { - updateWorkflowNodeStatus(processContext.getWfNodeDetails(), wfNodeState); - } catch (RegistryException e) { - log.error("Error while updating workflow node status update to the registry. nodeInstanceId :" - + processContext.getWfNodeDetails().getNodeInstanceId() + " status to: " - + processContext.getWfNodeDetails().getWorkflowNodeStatus().toString() , e); - } + void handleTaskStatusChangeEvent(TaskStatusChangeEvent taskStatusChangeEvent) { + TaskState taskState = taskStatusChangeEvent.getState(); + TaskIdentifier taskIdentity = taskStatusChangeEvent.getTaskIdentity(); + String taskId = taskIdentity.getTaskId(); + ProcessContext processContext = processingQueue.get(taskId); + if (processContext != null) { + WorkflowNodeState wfNodeState = WorkflowNodeState.INVOKED; + switch (taskState) { + case WAITING: + break; + case STARTED: + break; + case PRE_PROCESSING: + wfNodeState = WorkflowNodeState.INVOKED; + processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING); + break; + case INPUT_DATA_STAGING: + wfNodeState = WorkflowNodeState.INVOKED; + processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING); + break; + case EXECUTING: + wfNodeState = WorkflowNodeState.EXECUTING; + processContext.getWorkflowNode().setState(NodeState.EXECUTING); + break; + case OUTPUT_DATA_STAGING: + wfNodeState = WorkflowNodeState.COMPLETED; + processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING); + break; + case POST_PROCESSING: + wfNodeState = WorkflowNodeState.COMPLETED; + processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING); + break; + case COMPLETED: + wfNodeState = WorkflowNodeState.COMPLETED; + processContext.getWorkflowNode().setState(NodeState.EXECUTED); + break; + case FAILED: + wfNodeState = WorkflowNodeState.FAILED; + processContext.getWorkflowNode().setState(NodeState.FAILED); + break; + case UNKNOWN: + wfNodeState = WorkflowNodeState.UNKNOWN; + break; + case CONFIGURING_WORKSPACE: + wfNodeState = WorkflowNodeState.COMPLETED; + break; + case CANCELED: + case CANCELING: + wfNodeState = WorkflowNodeState.CANCELED; + processContext.getWorkflowNode().setState(NodeState.FAILED); + break; + default: + break; + } + if (wfNodeState != WorkflowNodeState.UNKNOWN) { + try { + updateWorkflowNodeStatus(processContext.getWfNodeDetails(), wfNodeState); + } catch (RegistryException e) { + log.error("Error while updating workflow node status update to the registry. nodeInstanceId :" + + processContext.getWfNodeDetails().getNodeInstanceId() + " status to: " + + processContext.getWfNodeDetails().getWorkflowNodeStatus().toString() , e); } } - } - } + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/774b092d/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java index ec5acfa..c7ab7b9 100644 --- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java +++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java @@ -21,23 +21,46 @@ package org.apache.airavata.simple.workflow.engine; +import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessageHandler; +import org.apache.airavata.messaging.core.MessagingConstants; import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher; -import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.airavata.model.messaging.event.TaskIdentifier; +import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent; +import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class WorkflowEnactmentService { private static WorkflowEnactmentService workflowEnactmentService; + private final RabbitMQStatusConsumer statusConsumer; + private String consumerId; private ExecutorService executor; + private Map<String,SimpleWorkflowInterpreter> workflowMap; - private WorkflowEnactmentService () { + private WorkflowEnactmentService () throws AiravataException { executor = Executors.newFixedThreadPool(getThreadPoolSize()); + workflowMap = new ConcurrentHashMap<String, SimpleWorkflowInterpreter>(); + statusConsumer = new RabbitMQStatusConsumer(); + consumerId = statusConsumer.listen(new TaskMessageHandler()); + // register the shutdown hook to un-bind status consumer. + Runtime.getRuntime().addShutdownHook(new EnactmentShutDownHook()); } - public static WorkflowEnactmentService getInstance(){ + public static WorkflowEnactmentService getInstance() throws AiravataException { if (workflowEnactmentService == null) { synchronized (WorkflowEnactmentService.class) { if (workflowEnactmentService == null) { @@ -51,14 +74,110 @@ public class WorkflowEnactmentService { public void submitWorkflow(String experimentId, String credentialToken, String gatewayName, - RabbitMQProcessPublisher publisher) throws RegistryException { + RabbitMQProcessPublisher publisher) throws Exception { SimpleWorkflowInterpreter simpleWorkflowInterpreter = new SimpleWorkflowInterpreter( experimentId, credentialToken,gatewayName, publisher); - executor.execute(simpleWorkflowInterpreter); + workflowMap.put(experimentId, simpleWorkflowInterpreter); + simpleWorkflowInterpreter.launchWorkflow(); + } private int getThreadPoolSize() { return ServerSettings.getEnactmentThreadPoolSize(); } + + private class TaskMessageHandler implements MessageHandler { + + @Override + public Map<String, Object> getProperties() { + Map<String, Object> props = new HashMap<String, Object>(); + String gatewayId = "*"; + String experimentId = "*"; + List<String> routingKeys = new ArrayList<String>(); + routingKeys.add(gatewayId); + routingKeys.add(gatewayId + "." + experimentId); + routingKeys.add(gatewayId + "." + experimentId+ ".*"); + routingKeys.add(gatewayId + "." + experimentId+ ".*.*"); + props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys); + return props; + } + + @Override + public void onMessage(MessageContext msgCtx) { + StatusHandler statusHandler = new StatusHandler(msgCtx); + executor.execute(statusHandler); + } + + + } + + private class StatusHandler implements Runnable{ + private final Logger log = LoggerFactory.getLogger(StatusHandler.class); + + private final MessageContext msgCtx; + + public StatusHandler(MessageContext msgCtx) { + this.msgCtx = msgCtx; + } + + @Override + public void run() { + process(); + } + + private void process() { + String message; + SimpleWorkflowInterpreter simpleWorkflowInterpreter; + if (msgCtx.getType() == MessageType.TASK) { + TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent(); + TaskIdentifier taskIdentifier = event.getTaskIdentity(); + simpleWorkflowInterpreter = getInterpreter(taskIdentifier.getExperimentId()); + if (simpleWorkflowInterpreter != null) { + simpleWorkflowInterpreter.handleTaskStatusChangeEvent(event); + } else { + // this happens when Task status messages comes after the Taskoutput messages,as we have worked on + // output changes it is ok to ignore this. + } + message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId(); + log.debug(message); + }else if (msgCtx.getType() == MessageType.TASKOUTPUT) { + TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent(); + TaskIdentifier taskIdentifier = event.getTaskIdentity(); + simpleWorkflowInterpreter = getInterpreter(taskIdentifier.getExperimentId()); + if (simpleWorkflowInterpreter != null) { + simpleWorkflowInterpreter.handleTaskOutputChangeEvent(event); + if (simpleWorkflowInterpreter.isAllDone()) { + workflowMap.remove(taskIdentifier.getExperimentId()); + } + } else { + throw new IllegalArgumentException("Error while processing TaskOutputChangeEvent, " + + "There is no registered workflow for experiment Id : " + taskIdentifier.getExperimentId()); + } + message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId(); + log.debug(message); + } else { + // not interested, ignores + } + } + + private SimpleWorkflowInterpreter getInterpreter(String experimentId){ + return workflowMap.get(experimentId); + } + } + + + private class EnactmentShutDownHook extends Thread { + private final Logger log = LoggerFactory.getLogger(EnactmentShutDownHook.class); + @Override + public void run() { + super.run(); + try { + statusConsumer.stopListen(consumerId); + log.info("Successfully un-binded task status consumer"); + } catch (AiravataException e) { + log.error("Error while un-bind enactment status consumer", e); + } + } + } }
