Orchestrator consumerls experiment submit events
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/54f5c34d Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/54f5c34d Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/54f5c34d Branch: refs/heads/lahiru/AIRAVATA-2057 Commit: 54f5c34d4eed40acd643a0388926c3c3b62be526 Parents: 4e3dc9a Author: Shameera Rathnayaka <[email protected]> Authored: Thu Aug 11 14:42:48 2016 -0400 Committer: Shameera Rathnayaka <[email protected]> Committed: Thu Aug 11 14:42:48 2016 -0400 ---------------------------------------------------------------------- .../airavata/messaging/core/MessageHandler.java | 2 +- .../server/OrchestratorServerHandler.java | 37 ++++++++++++++++++-- 2 files changed, 36 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/54f5c34d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java index 23646da..bc47e68 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java @@ -24,5 +24,5 @@ package org.apache.airavata.messaging.core; @FunctionalInterface public interface MessageHandler { - void onMessage(MessageContext message); + void onMessage(MessageContext messageContext); } http://git-wip-us.apache.org/repos/asf/airavata/blob/54f5c34d/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index 03f6f8a..b425c5e 100644 --- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -48,6 +48,7 @@ import org.apache.airavata.model.error.LaunchValidationException; import org.apache.airavata.model.experiment.ExperimentModel; import org.apache.airavata.model.experiment.ExperimentType; import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; +import org.apache.airavata.model.messaging.event.ExperimentSubmitEvent; import org.apache.airavata.model.messaging.event.MessageType; import org.apache.airavata.model.messaging.event.ProcessIdentifier; import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent; @@ -99,7 +100,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { private String airavataUserName; private String gatewayName; private Publisher publisher; - private Subscriber statusSubscribe; + private final Subscriber statusSubscribe; + private final Subscriber experimentSubscriber; private CuratorFramework curatorClient; /** @@ -112,7 +114,10 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { public OrchestratorServerHandler() throws OrchestratorException{ try { publisher = MessagingFactory.getPublisher(Type.STATUS); - setAiravataUserName(ServerSettings.getDefaultUser()); + List<String> routingKeys = new ArrayList<>(); + routingKeys.add(ServerSettings.getRabbitmqExperimentLaunchQueueName()); + experimentSubscriber = MessagingFactory.getSubscriber(new ExperimentHandler(), routingKeys, Type.EXPERIMENT_LAUNCH); + setAiravataUserName(ServerSettings.getDefaultUser()); } catch (AiravataException e) { log.error(e.getMessage(), e); throw new OrchestratorException("Error while initializing orchestrator service", e); @@ -601,4 +606,32 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { } } } + + + private class ExperimentHandler implements MessageHandler { + + @Override + public void onMessage(MessageContext messageContext) { + if (messageContext.getType() != MessageType.EXPERIMENT) { + experimentSubscriber.sendAck(messageContext.getDeliveryTag()); + log.error("Orchestrator got un-support message type : " + messageContext.getType()); + } + try { + byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent()); + ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent(); + ThriftUtils.createThriftFromBytes(bytes, expEvent); + if (messageContext.isRedeliver()) { + // TODO - handle redelivery scenario + experimentSubscriber.sendAck(messageContext.getDeliveryTag()); + } else { + launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId()); + experimentSubscriber.sendAck(messageContext.getDeliveryTag()); + } + } catch (TException e) { + log.error("Experiment launch failed due to Thrift conversion error", e); + experimentSubscriber.sendAck(messageContext.getDeliveryTag()); + } + } + } + }
