Orchestrator consumerls experiment submit events

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/54f5c34d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/54f5c34d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/54f5c34d

Branch: refs/heads/lahiru/AIRAVATA-2057
Commit: 54f5c34d4eed40acd643a0388926c3c3b62be526
Parents: 4e3dc9a
Author: Shameera Rathnayaka <[email protected]>
Authored: Thu Aug 11 14:42:48 2016 -0400
Committer: Shameera Rathnayaka <[email protected]>
Committed: Thu Aug 11 14:42:48 2016 -0400

----------------------------------------------------------------------
 .../airavata/messaging/core/MessageHandler.java |  2 +-
 .../server/OrchestratorServerHandler.java       | 37 ++++++++++++++++++--
 2 files changed, 36 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/54f5c34d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
index 23646da..bc47e68 100644
--- 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
+++ 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
@@ -24,5 +24,5 @@ package org.apache.airavata.messaging.core;
 @FunctionalInterface
 public interface MessageHandler {
 
-    void onMessage(MessageContext message);
+    void onMessage(MessageContext messageContext);
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/54f5c34d/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
 
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 03f6f8a..b425c5e 100644
--- 
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ 
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -48,6 +48,7 @@ import 
org.apache.airavata.model.error.LaunchValidationException;
 import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.experiment.ExperimentType;
 import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.ExperimentSubmitEvent;
 import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.messaging.event.ProcessIdentifier;
 import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
@@ -99,7 +100,8 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
        private String airavataUserName;
        private String gatewayName;
        private Publisher publisher;
-       private Subscriber statusSubscribe;
+       private final Subscriber statusSubscribe;
+       private final Subscriber experimentSubscriber;
        private CuratorFramework curatorClient;
 
     /**
@@ -112,7 +114,10 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
        public OrchestratorServerHandler() throws OrchestratorException{
                try {
                publisher = MessagingFactory.getPublisher(Type.STATUS);
-            setAiravataUserName(ServerSettings.getDefaultUser());
+                       List<String> routingKeys = new ArrayList<>();
+                       
routingKeys.add(ServerSettings.getRabbitmqExperimentLaunchQueueName());
+                       experimentSubscriber = 
MessagingFactory.getSubscriber(new ExperimentHandler(), routingKeys, 
Type.EXPERIMENT_LAUNCH);
+                       setAiravataUserName(ServerSettings.getDefaultUser());
                } catch (AiravataException e) {
             log.error(e.getMessage(), e);
             throw new OrchestratorException("Error while initializing 
orchestrator service", e);
@@ -601,4 +606,32 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
                        }
                }
        }
+
+
+       private class ExperimentHandler implements MessageHandler {
+
+               @Override
+               public void onMessage(MessageContext messageContext) {
+                       if (messageContext.getType() != MessageType.EXPERIMENT) 
{
+                               
experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+                               log.error("Orchestrator got un-support message 
type : " + messageContext.getType());
+                       }
+                       try {
+                               byte[] bytes = 
ThriftUtils.serializeThriftObject(messageContext.getEvent());
+                               ExperimentSubmitEvent expEvent = new 
ExperimentSubmitEvent();
+                               ThriftUtils.createThriftFromBytes(bytes, 
expEvent);
+                               if (messageContext.isRedeliver()) {
+                    // TODO - handle redelivery scenario
+                    
experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+                } else {
+                    launchExperiment(expEvent.getExperimentId(), 
expEvent.getGatewayId());
+                    
experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+                }
+                       } catch (TException e) {
+                               log.error("Experiment launch failed due to 
Thrift conversion error", e);
+                experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+                       }
+               }
+       }
+
 }

Reply via email to