Fixed experiment cancellation issues
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/309a9ff8 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/309a9ff8 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/309a9ff8 Branch: refs/heads/lahiru/AIRAVATA-2057 Commit: 309a9ff83945b59930d9b37cff52e4fb6e405b5c Parents: bc37334 Author: Shameera Rathnayaka <[email protected]> Authored: Tue Aug 16 15:37:14 2016 -0400 Committer: Shameera Rathnayaka <[email protected]> Committed: Tue Aug 16 15:37:14 2016 -0400 ---------------------------------------------------------------------- .../server/handler/AiravataServerHandler.java | 6 ++- .../server/OrchestratorServerHandler.java | 56 +++++++++++++++----- 2 files changed, 48 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/309a9ff8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java index 8ce1c65..e489b43 100644 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java @@ -1458,13 +1458,15 @@ public class AiravataServerHandler implements Airavata.Iface { switch (experimentStatus.getState()) { case COMPLETED: case CANCELED: case FAILED: case CANCELING: logger.warn("Can't terminate already {} experiment", experimentStatus.getState().name()); + break; case CREATED: logger.warn("Experiment termination is only allowed for launched experiments."); + break; default: submitCancelExperiment(airavataExperimentId, gatewayId); - + logger.debug("Airavata cancelled experiment with experiment id : " + airavataExperimentId); + break; } - logger.debug("Airavata cancelled experiment with experiment id : " + airavataExperimentId); } catch (RegistryServiceException | AiravataException e) { logger.error(airavataExperimentId, "Error while cancelling the experiment...", e); AiravataSystemException exception = new AiravataSystemException(); http://git-wip-us.apache.org/repos/asf/airavata/blob/309a9ff8/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index b425c5e..17bceb4 100644 --- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -90,6 +90,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.BiConsumer; public class OrchestratorServerHandler implements OrchestratorService.Iface { private static Logger log = LoggerFactory.getLogger(OrchestratorServerHandler.class); @@ -612,25 +613,56 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { @Override public void onMessage(MessageContext messageContext) { - if (messageContext.getType() != MessageType.EXPERIMENT) { - experimentSubscriber.sendAck(messageContext.getDeliveryTag()); - log.error("Orchestrator got un-support message type : " + messageContext.getType()); + + switch (messageContext.getType()) { + case EXPERIMENT: + launchExperiment(messageContext); + break; + case EXPERIMENT_CANCEL: + cancelExperiment(messageContext); + break; + default: + experimentSubscriber.sendAck(messageContext.getDeliveryTag()); + log.error("Orchestrator got un-support message type : " + messageContext.getType()); + break; } + } + + private void cancelExperiment(MessageContext messageContext) { try { byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent()); ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent(); ThriftUtils.createThriftFromBytes(bytes, expEvent); - if (messageContext.isRedeliver()) { - // TODO - handle redelivery scenario - experimentSubscriber.sendAck(messageContext.getDeliveryTag()); - } else { - launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId()); - experimentSubscriber.sendAck(messageContext.getDeliveryTag()); - } + terminateExperiment(expEvent.getExperimentId(), expEvent.getGatewayId()); } catch (TException e) { - log.error("Experiment launch failed due to Thrift conversion error", e); - experimentSubscriber.sendAck(messageContext.getDeliveryTag()); + log.error("Experiment cancellation failed due to Thrift conversion error", e); + }finally { + experimentSubscriber.sendAck(messageContext.getDeliveryTag()); } + + } + } + + private void launchExperiment(MessageContext messageContext) { + try { + byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent()); + ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent(); + ThriftUtils.createThriftFromBytes(bytes, expEvent); + if (messageContext.isRedeliver()) { + ExperimentModel experimentModel = (ExperimentModel) experimentCatalog. + get(ExperimentCatalogModelType.EXPERIMENT, expEvent.getExperimentId()); + if (experimentModel.getExperimentStatus().getState() == ExperimentState.CREATED) { + launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId()); + } + } else { + launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId()); + } + } catch (TException e) { + log.error("Experiment launch failed due to Thrift conversion error", e); + } catch (RegistryException e) { + log.error("Experiment launch failed due to registry access issue", e); + }finally { + experimentSubscriber.sendAck(messageContext.getDeliveryTag()); } }
