Repository: airavata Updated Branches: refs/heads/master e9a451dc0 -> 4a978d4f1
saving data in zookeeper when terminate Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4a978d4f Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4a978d4f Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4a978d4f Branch: refs/heads/master Commit: 4a978d4f1347aa683189f7ea5e814d125daaecb1 Parents: e9a451d Author: Chathuri Wimalasena <[email protected]> Authored: Mon May 11 15:52:17 2015 -0400 Committer: Chathuri Wimalasena <[email protected]> Committed: Mon May 11 15:52:17 2015 -0400 ---------------------------------------------------------------------- .../airavata/gfac/server/GfacServerHandler.java | 11 +++ .../airavata/gfac/core/cpi/BetterGfacImpl.java | 84 +++++++------------- 2 files changed, 39 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/4a978d4f/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 76497ba..b90c731 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -403,12 +403,23 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { TBase messageEvent = message.getEvent(); byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); ThriftUtils.createThriftFromBytes(bytes, event); + GFacUtils.setExperimentCancel(event.getExperimentId(), event.getTaskId(), zk); + AiravataZKUtils.getExpStatePath(event.getExperimentId()); cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId()); System.out.println(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getType()); } catch (TException e) { logger.error(e.getMessage(), e); //nobody is listening so nothing to throw rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag()); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag()); + } catch (ApplicationSettingsException e) { + logger.error(e.getMessage(), e); + rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag()); + } catch (KeeperException e) { + logger.error(e.getMessage(), e); + rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag()); } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/4a978d4f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index 6eeef28..dd82fa7 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -564,97 +564,69 @@ public class BetterGfacImpl implements GFac,Watcher { } private boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException { - // We need to check whether this job is submitted as a part of a large workflow. If yes, - // we need to setup workflow tracking listener. try { - // we cannot call GFacUtils.getZKExperimentStateValue because experiment might be running in some other node -// String expPath = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk); -// int stateVal = 0; -// if(expPath != null){ -// Stat exists = zk.exists(expPath + File.separator + "operation", false); -// zk.getData(expPath + File.separator + "operation", this, exists); -// stateVal = GFacUtils.getZKExperimentStateValue(zk, expPath); // this is the original state came, if we query again it might be different,so we preserve this state in the environment -// } + // we cannot call GFacUtils.getZKExperimentStateValue because experiment might be running in some other node + String expPath = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk); + Stat exists = zk.exists(expPath + File.separator + "operation", false); + zk.getData(expPath + File.separator + "operation", this, exists); + GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(zk, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) , GfacExperimentState.ACCEPTED)); // immediately we get the request we update the status String workflowInstanceID = null; if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) { - // This mean we need to register workflow tracking listener. //todo implement WorkflowTrackingListener properly -// registerWorkflowTrackingListener(workflowInstanceID, jobExecutionContext); } // Register log event listener. This is required in all scenarios. jobExecutionContext.getNotificationService().registerListener(new LoggingListener()); -// if (stateVal < 2) { -// // In this scenario We do everything from the beginning -// log.info("Job is not yet submitted, so nothing much to do except changing the registry entry " + -// " and stop the execution chain"); -// } else if (stateVal >= 8) { -// log.error("This experiment is almost finished, so cannot cancel this experiment"); -// ZKUtil.deleteRecursive(zk, -// AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID())); -// } else { + if (isNewJob(gfacExpState)) { + log.info("Job is not yet submitted, so nothing much to do except changing the registry entry " + + " and stop the execution chain"); + } else if (isCompletedJob(gfacExpState)) { + log.error("This experiment is almost finished, so cannot cancel this experiment"); + ZKUtil.deleteRecursive(zk, + AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID())); + } else { log.info("Job is in a position to perform a proper cancellation"); try { Scheduler.schedule(jobExecutionContext); - invokeProviderCancel(jobExecutionContext); - } catch (Exception e) { try { // we make the experiment as failed due to exception scenario monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED)); - // monitorPublisher.publish(new - // ExperimentStatusChangedEvent(new - // ExperimentIdentity(jobExecutionContext.getExperimentID()), - // ExperimentState.FAILED)); - // Updating the task status if there's any task associated - // monitorPublisher.publish(new TaskStatusChangeRequest( - // new TaskIdentity(jobExecutionContext.getExperimentID(), - // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), - // jobExecutionContext.getTaskData().getTaskID()), - // TaskState.FAILED - // )); JobStatusChangeRequestEvent changeRequestEvent = new JobStatusChangeRequestEvent(); changeRequestEvent.setState(JobState.FAILED); JobIdentifier jobIdentifier = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(), - jobExecutionContext.getTaskData().getTaskID(), - jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), - jobExecutionContext.getExperimentID(), - jobExecutionContext.getGatewayID()); + jobExecutionContext.getTaskData().getTaskID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + jobExecutionContext.getExperimentID(), + jobExecutionContext.getGatewayID()); changeRequestEvent.setJobIdentity(jobIdentifier); monitorPublisher.publish(changeRequestEvent); } catch (NullPointerException e1) { log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, " + "NullPointerException occurred because at this point there might not have Job Created", e1, e); - //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED)); // Updating the task status if there's any task associated monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, - new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), - jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), - jobExecutionContext.getExperimentID(), - jobExecutionContext.getGatewayID()))); + new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + jobExecutionContext.getExperimentID(), + jobExecutionContext.getGatewayID()))); } jobExecutionContext.setProperty(ERROR_SENT, "true"); jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause())); throw new GFacException(e.getMessage(), e); } -// } + } return true; -// } catch (ApplicationSettingsException e) { -// log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e); -// throw new GFacException(e.getMessage(), e); -// } catch (KeeperException e) { -// log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e); -// throw new GFacException(e.getMessage(), e); - } catch (Exception e) { - log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e); - throw new GFacException(e.getMessage(), e); - }finally { - closeZK(jobExecutionContext); + }catch(Exception e){ + log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e); + throw new GFacException(e.getMessage(), e); + }finally{ + closeZK(jobExecutionContext); + } } - } private void reLaunch(JobExecutionContext jobExecutionContext, GfacExperimentState state) throws GFacException { // Scheduler will decide the execution flow of handlers and provider
