Repository: airavata Updated Branches: refs/heads/master 395c9d324 -> 73c8337aa
Fix Airavata-1661 for job cancel Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/73c8337a Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/73c8337a Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/73c8337a Branch: refs/heads/master Commit: 73c8337aa84a8ed6a630c0bed2080a319a6bbc69 Parents: 395c9d3 Author: raminder <[email protected]> Authored: Wed Apr 15 09:58:43 2015 -0400 Committer: raminder <[email protected]> Committed: Wed Apr 15 09:58:43 2015 -0400 ---------------------------------------------------------------------- .../airavata/gfac/server/GfacServerHandler.java | 2 +- .../airavata/gfac/core/cpi/BetterGfacImpl.java | 45 +++++++++++--------- .../core/impl/RabbitMQTaskLaunchConsumer.java | 2 +- .../core/impl/RabbitMQTaskLaunchPublisher.java | 11 +---- .../core/impl/GFACPassiveJobSubmitter.java | 6 ++- 5 files changed, 33 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/73c8337a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index a535090..261dea4 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -265,7 +265,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { logger.infoId(experimentId, "GFac Received cancel job request for Experiment: {} TaskId: {} ", experimentId, taskId); GFac gfac = getGfac(); try { - if (gfac.cancel(experimentId, taskId, ServerSettings.getDefaultUserGateway())) { + if (gfac.cancel(experimentId, taskId, gatewayId)) { logger.debugId(experimentId, "Successfully cancelled job, experiment {} , task {}", experimentId, taskId); return true; } else { http://git-wip-us.apache.org/repos/asf/airavata/blob/73c8337a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index 3fa7237..19c77ac 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -531,9 +531,14 @@ public class BetterGfacImpl implements GFac,Watcher { // We need to check whether this job is submitted as a part of a large workflow. If yes, // we need to setup workflow tracking listener. try { - // we cannot call GFacUtils.getZKExperimentStateValue because experiment might be running in some other node - String expPath = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk); - int stateVal = GFacUtils.getZKExperimentStateValue(zk, expPath); // this is the original state came, if we query again it might be different,so we preserve this state in the environment + // we cannot call GFacUtils.getZKExperimentStateValue because experiment might be running in some other node +// String expPath = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk); +// int stateVal = 0; +// if(expPath != null){ +// Stat exists = zk.exists(expPath + File.separator + "operation", false); +// zk.getData(expPath + File.separator + "operation", this, exists); +// stateVal = GFacUtils.getZKExperimentStateValue(zk, expPath); // this is the original state came, if we query again it might be different,so we preserve this state in the environment +// } monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) , GfacExperimentState.ACCEPTED)); // immediately we get the request we update the status String workflowInstanceID = null; @@ -544,15 +549,15 @@ public class BetterGfacImpl implements GFac,Watcher { } // Register log event listener. This is required in all scenarios. jobExecutionContext.getNotificationService().registerListener(new LoggingListener()); - if (stateVal < 2) { - // In this scenario We do everything from the beginning - log.info("Job is not yet submitted, so nothing much to do except changing the registry entry " + - " and stop the execution chain"); - } else if (stateVal >= 8) { - log.error("This experiment is almost finished, so cannot cancel this experiment"); - ZKUtil.deleteRecursive(zk, - AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID())); - } else { +// if (stateVal < 2) { +// // In this scenario We do everything from the beginning +// log.info("Job is not yet submitted, so nothing much to do except changing the registry entry " + +// " and stop the execution chain"); +// } else if (stateVal >= 8) { +// log.error("This experiment is almost finished, so cannot cancel this experiment"); +// ZKUtil.deleteRecursive(zk, +// AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID())); +// } else { log.info("Job is in a position to perform a proper cancellation"); try { Scheduler.schedule(jobExecutionContext); @@ -599,15 +604,15 @@ public class BetterGfacImpl implements GFac,Watcher { jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause())); throw new GFacException(e.getMessage(), e); } - } +// } return true; - } catch (ApplicationSettingsException e) { - log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e); - throw new GFacException(e.getMessage(), e); - } catch (KeeperException e) { - log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e); - throw new GFacException(e.getMessage(), e); - } catch (InterruptedException e) { +// } catch (ApplicationSettingsException e) { +// log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e); +// throw new GFacException(e.getMessage(), e); +// } catch (KeeperException e) { +// log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e); +// throw new GFacException(e.getMessage(), e); + } catch (Exception e) { log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e); throw new GFacException(e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/airavata/blob/73c8337a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java index 0cd1042..8007ece 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java @@ -181,7 +181,7 @@ public class RabbitMQTaskLaunchConsumer { + "' and with message type '" + message.getMessageType() + "' for experimentId: " + taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId()); event = taskTerminateEvent; - gatewayId = null; + gatewayId = taskTerminateEvent.getGatewayId(); } System.out.println("*deliveryTag:"+deliveryTag); MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId,deliveryTag); http://git-wip-us.apache.org/repos/asf/airavata/blob/73c8337a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java index 919087e..34e2545 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java @@ -36,8 +36,7 @@ import org.slf4j.LoggerFactory; public class RabbitMQTaskLaunchPublisher implements Publisher{ private final static Logger log = LoggerFactory.getLogger(RabbitMQTaskLaunchPublisher.class); private String launchTask; - private String cancelTask; - + private RabbitMQProducer rabbitMQProducer; public RabbitMQTaskLaunchPublisher() throws Exception { @@ -45,7 +44,6 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{ try { brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); launchTask = ServerSettings.getLaunchQueueName(); - cancelTask = ServerSettings.getCancelQueueName(); } catch (ApplicationSettingsException e) { String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; log.error(message, e); @@ -64,12 +62,7 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{ message.setMessageId(msgCtx.getMessageId()); message.setMessageType(msgCtx.getType()); message.setUpdatedTime(msgCtx.getUpdatedTime().getTime()); - String routingKey = null; - if (msgCtx.getType().equals(MessageType.LAUNCHTASK)){ - routingKey = launchTask; - }else if(msgCtx.getType().equals(MessageType.TERMINATETASK)){ - routingKey = cancelTask; - } + String routingKey = launchTask; byte[] messageBody = ThriftUtils.serializeThriftObject(message); rabbitMQProducer.sendToWorkerQueue(messageBody, routingKey); log.info("Successfully published to launch queue ..."); http://git-wip-us.apache.org/repos/asf/airavata/blob/73c8337a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java index af60d85..915bddf 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java @@ -36,6 +36,7 @@ import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.messaging.core.PublisherFactory; import org.apache.airavata.model.messaging.event.MessageType; import org.apache.airavata.model.messaging.event.TaskSubmitEvent; +import org.apache.airavata.model.messaging.event.TaskTerminateEvent; import org.apache.airavata.orchestrator.core.context.OrchestratorContext; import org.apache.airavata.orchestrator.core.exception.OrchestratorException; import org.apache.airavata.orchestrator.core.job.JobSubmitter; @@ -186,8 +187,9 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { String[] split = gfacNodeData.split(":"); if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { // before submitting the job we check again the state of the node - TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, gatewayId, tokenId); - MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.TERMINATETASK, "LAUNCH.TERMINATE-" + UUID.randomUUID().toString(), gatewayId); + TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent(experimentID, taskID, gatewayId, tokenId); + MessageContext messageContext = new MessageContext(taskTerminateEvent, MessageType.TERMINATETASK, "LAUNCH.TERMINATE-" + UUID.randomUUID().toString(), gatewayId); + messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); publisher.publish(messageContext); } }
