Repository: airavata Updated Branches: refs/heads/master ea93cc1ef -> ec9b6fe42
check iscancelled from registry experiment status Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/ec9b6fe4 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/ec9b6fe4 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/ec9b6fe4 Branch: refs/heads/master Commit: ec9b6fe42114f2cd7b0a411c40bb844217b169e9 Parents: ea93cc1 Author: Chathuri Wimalasena <[email protected]> Authored: Wed May 13 11:05:20 2015 -0400 Committer: Chathuri Wimalasena <[email protected]> Committed: Wed May 13 11:05:20 2015 -0400 ---------------------------------------------------------------------- .../airavata/gfac/core/cpi/BetterGfacImpl.java | 39 +++-- .../server/OrchestratorServerHandler.java | 143 +++++++++---------- 2 files changed, 85 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/ec9b6fe4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index 32317f3..5f0b1a1 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -59,6 +59,7 @@ import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePrefer import org.apache.airavata.model.messaging.event.*; import org.apache.airavata.model.workspace.experiment.*; import org.apache.airavata.registry.cpi.Registry; +import org.apache.airavata.registry.cpi.RegistryException; import org.apache.airavata.registry.cpi.RegistryModelType; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; @@ -82,23 +83,13 @@ import java.util.*; public class BetterGfacImpl implements GFac,Watcher { private static final Logger log = LoggerFactory.getLogger(BetterGfacImpl.class); public static final String ERROR_SENT = "ErrorSent"; - private Registry registry; - private AppCatalog appCatalog; - // we are not storing zk instance in to jobExecution context private ZooKeeper zk; - private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>(); - private static File gfacConfigFile; - private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>(); - private static MonitorPublisher monitorPublisher; - - private boolean cancelled = false; - private static Integer mutex = -1; /** @@ -116,7 +107,6 @@ public class BetterGfacImpl implements GFac,Watcher { synchronized (mutex) { mutex.wait(5000); // waiting for the syncConnected event } - this.appCatalog = appCatalog; } public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher,RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) { @@ -706,7 +696,7 @@ public class BetterGfacImpl implements GFac,Watcher { // here we do not skip handler if some handler does not have to be // run again during re-run it can implement // that logic in to the handler - if (!isCancelled()) { + if (!isCancelled(jobExecutionContext)) { invokeInFlowHandlers(jobExecutionContext); // to keep the // consistency we always // try to re-run to @@ -722,7 +712,7 @@ public class BetterGfacImpl implements GFac,Watcher { // After executing the in handlers provider instance should be set // to job execution context. // We get the provider instance and execute it. - if (!isCancelled()) { + if (!isCancelled(jobExecutionContext)) { invokeProviderExecute(jobExecutionContext); } else { log.info("Experiment is cancelled, so launch operation is stopping immediately"); @@ -893,7 +883,7 @@ public class BetterGfacImpl implements GFac,Watcher { monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) , GfacExperimentState.INHANDLERSINVOKING)); for (GFacHandlerConfig handlerClassName : handlers) { - if(!isCancelled()) { + if(!isCancelled(jobExecutionContext)) { Class<? extends GFacHandler> handlerClass; GFacHandler handler; try { @@ -971,7 +961,7 @@ public class BetterGfacImpl implements GFac,Watcher { try { monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING)); for (GFacHandlerConfig handlerClassName : handlers) { - if (!isCancelled()) { + if (!isCancelled(jobExecutionContext)) { Class<? extends GFacHandler> handlerClass; GFacHandler handler; try { @@ -1254,12 +1244,18 @@ public class BetterGfacImpl implements GFac,Watcher { } - public boolean isCancelled() { - return cancelled; - } - - public void setCancelled(boolean cancelled) { - this.cancelled = cancelled; + public boolean isCancelled(JobExecutionContext executionContext) throws RegistryException { + // we should check whether experiment is cancelled using registry + ExperimentStatus status = (ExperimentStatus)registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID()); + if (status != null){ + ExperimentState experimentState = status.getExperimentState(); + if (experimentState != null){ + if(experimentState == ExperimentState.CANCELING || experimentState == ExperimentState.CANCELED){ + return true; + } + } + } + return false; } public void process(WatchedEvent watchedEvent) { @@ -1267,7 +1263,6 @@ public class BetterGfacImpl implements GFac,Watcher { if (Event.EventType.NodeDataChanged.equals(watchedEvent.getType())) { // node data is changed, this means node is cancelled. log.info("Experiment is cancelled with this path:" + watchedEvent.getPath()); - this.cancelled = true; } synchronized (mutex) { Event.KeeperState state = watchedEvent.getState(); http://git-wip-us.apache.org/repos/asf/airavata/blob/ec9b6fe4/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index 3da1e47..f903373 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -525,52 +525,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, throw new OrchestratorException("Error retrieving the Experiment by the given experimentID: " + experimentId); } ExperimentState experimentState = experiment.getExperimentStatus().getExperimentState(); - if (experimentState.getValue()> 5 && experimentState.getValue()<10) { - log.errorId(experimentId, "Unable to mark experiment as Cancelled, current state {} doesn't allow to cancel the experiment {}.", - experiment.getExperimentStatus().getExperimentState().toString(), experimentId); - throw new OrchestratorException("Unable to mark experiment as Cancelled, because current state is: " - + experiment.getExperimentStatus().getExperimentState().toString()); - }else if(experimentState.getValue()<3){ - // when experiment status is < 3 no jobDetails object is created, - // so we don't have to worry, we simply have to change the status and stop the execution - ExperimentStatus status = new ExperimentStatus(); - status.setExperimentState(ExperimentState.CANCELED); - status.setTimeOfStateChange(Calendar.getInstance() - .getTimeInMillis()); - experiment.setExperimentStatus(status); - registry.update(RegistryModelType.EXPERIMENT, experiment, - experimentId); - List<String> ids = registry.getIds( - RegistryModelType.WORKFLOW_NODE_DETAIL, - WorkflowNodeConstants.EXPERIMENT_ID, experimentId); - for (String workflowNodeId : ids) { - WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry - .get(RegistryModelType.WORKFLOW_NODE_DETAIL, - workflowNodeId); - WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus(); - workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED); - workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance() - .getTimeInMillis()); - workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus); - registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail, - workflowNodeId); - List<Object> taskDetailList = registry.get( - RegistryModelType.TASK_DETAIL, - TaskDetailConstants.NODE_ID, workflowNodeId); - for (Object o : taskDetailList) { - TaskDetails taskDetails = (TaskDetails) o; - TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus(); - taskStatus.setExecutionState(TaskState.CANCELED); - taskStatus.setTimeOfStateChange(Calendar.getInstance() - .getTimeInMillis()); - taskDetails.setTaskStatus(taskStatus); - registry.update(RegistryModelType.TASK_DETAIL, o, - taskDetails); -// GFacUtils.setExperimentCancel(experimentId, taskDetails.getTaskID(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag()); - } - } - }else { - + if (isCancelValid(experimentState)){ ExperimentStatus status = new ExperimentStatus(); status.setExperimentState(ExperimentState.CANCELING); status.setTimeOfStateChange(Calendar.getInstance() @@ -617,49 +572,87 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, taskDetails.setTaskStatus(taskStatus); registry.update(RegistryModelType.TASK_DETAIL, o, taskDetails.getTaskID()); -// GFacUtils.setExperimentCancel(experimentId, taskDetails.getTaskID(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag()); } - // iterate through all the generated tasks and performs the - // job submisssion+monitoring - // launching the experiment orchestrator.cancelExperiment(experiment, workflowNodeDetail, taskDetails, tokenId); - - // after performing gfac level cancel operation - // mark task cancelled - taskStatus.setExecutionState(TaskState.CANCELED); - taskStatus.setTimeOfStateChange(Calendar.getInstance() - .getTimeInMillis()); - taskDetails.setTaskStatus(taskStatus); - registry.update(RegistryModelType.TASK_DETAIL, o, - taskDetails.getTaskID()); + // Status update should be done at the monitor } - // mark workflownode cancelled - WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus(); - workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED); - workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance() + } + }else { + if (isCancelAllowed(experimentState)){ + // when experiment status is < 3 no jobDetails object is created, + // so we don't have to worry, we simply have to change the status and stop the execution + ExperimentStatus status = new ExperimentStatus(); + status.setExperimentState(ExperimentState.CANCELED); + status.setTimeOfStateChange(Calendar.getInstance() .getTimeInMillis()); - workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus); - registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail, - workflowNodeId); + experiment.setExperimentStatus(status); + registry.update(RegistryModelType.EXPERIMENT, experiment, + experimentId); + List<String> ids = registry.getIds( + RegistryModelType.WORKFLOW_NODE_DETAIL, + WorkflowNodeConstants.EXPERIMENT_ID, experimentId); + for (String workflowNodeId : ids) { + WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry + .get(RegistryModelType.WORKFLOW_NODE_DETAIL, + workflowNodeId); + WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus(); + workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED); + workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance() + .getTimeInMillis()); + workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus); + registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail, + workflowNodeId); + List<Object> taskDetailList = registry.get( + RegistryModelType.TASK_DETAIL, + TaskDetailConstants.NODE_ID, workflowNodeId); + for (Object o : taskDetailList) { + TaskDetails taskDetails = (TaskDetails) o; + TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus(); + taskStatus.setExecutionState(TaskState.CANCELED); + taskStatus.setTimeOfStateChange(Calendar.getInstance() + .getTimeInMillis()); + taskDetails.setTaskStatus(taskStatus); + registry.update(RegistryModelType.TASK_DETAIL, o, + taskDetails); + } + } + }else { + log.errorId(experimentId, "Unable to mark experiment as Cancelled, current state {} doesn't allow to cancel the experiment {}.", + experiment.getExperimentStatus().getExperimentState().toString(), experimentId); + throw new OrchestratorException("Unable to mark experiment as Cancelled, because current state is: " + + experiment.getExperimentStatus().getExperimentState().toString()); } - // mark experiment cancelled - status = new ExperimentStatus(); - status.setExperimentState(ExperimentState.CANCELED); - status.setTimeOfStateChange(Calendar.getInstance() - .getTimeInMillis()); - experiment.setExperimentStatus(status); - registry.update(RegistryModelType.EXPERIMENT, experiment, - experimentId); } log.info("Experiment: " + experimentId + " is cancelled !!!!!"); - } catch (Exception e) { throw new TException(e); } return true; } + private boolean isCancelValid(ExperimentState state){ + switch (state) { + case LAUNCHED: + case EXECUTING: + case CANCELING: + return true; + default: + return false; + } + } + + private boolean isCancelAllowed(ExperimentState state){ + switch (state) { + case CREATED: + case VALIDATED: + case SCHEDULED: + return true; + default: + return false; + } + } + private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken) throws TException { try { WorkflowEnactmentService.getInstance().
