Fixed experiment cancellation issues

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/309a9ff8
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/309a9ff8
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/309a9ff8

Branch: refs/heads/lahiru/AIRAVATA-2057
Commit: 309a9ff83945b59930d9b37cff52e4fb6e405b5c
Parents: bc37334
Author: Shameera Rathnayaka <[email protected]>
Authored: Tue Aug 16 15:37:14 2016 -0400
Committer: Shameera Rathnayaka <[email protected]>
Committed: Tue Aug 16 15:37:14 2016 -0400

----------------------------------------------------------------------
 .../server/handler/AiravataServerHandler.java   |  6 ++-
 .../server/OrchestratorServerHandler.java       | 56 +++++++++++++++-----
 2 files changed, 48 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/309a9ff8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git 
a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
 
b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index 8ce1c65..e489b43 100644
--- 
a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ 
b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -1458,13 +1458,15 @@ public class AiravataServerHandler implements 
Airavata.Iface {
             switch (experimentStatus.getState()) {
                 case COMPLETED: case CANCELED: case FAILED: case CANCELING:
                     logger.warn("Can't terminate already {} experiment", 
experimentStatus.getState().name());
+                    break;
                 case CREATED:
                     logger.warn("Experiment termination is only allowed for 
launched experiments.");
+                    break;
                 default:
                     submitCancelExperiment(airavataExperimentId, gatewayId);
-
+                    logger.debug("Airavata cancelled experiment with 
experiment id : " + airavataExperimentId);
+                    break;
             }
-            logger.debug("Airavata cancelled experiment with experiment id : " 
+ airavataExperimentId);
         } catch (RegistryServiceException | AiravataException e) {
             logger.error(airavataExperimentId, "Error while cancelling the 
experiment...", e);
             AiravataSystemException exception = new AiravataSystemException();

http://git-wip-us.apache.org/repos/asf/airavata/blob/309a9ff8/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
 
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index b425c5e..17bceb4 100644
--- 
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ 
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -90,6 +90,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.BiConsumer;
 
 public class OrchestratorServerHandler implements OrchestratorService.Iface {
        private static Logger log = 
LoggerFactory.getLogger(OrchestratorServerHandler.class);
@@ -612,25 +613,56 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
 
                @Override
                public void onMessage(MessageContext messageContext) {
-                       if (messageContext.getType() != MessageType.EXPERIMENT) 
{
-                               
experimentSubscriber.sendAck(messageContext.getDeliveryTag());
-                               log.error("Orchestrator got un-support message 
type : " + messageContext.getType());
+
+                       switch (messageContext.getType()) {
+                               case EXPERIMENT:
+                                       launchExperiment(messageContext);
+                                       break;
+                               case EXPERIMENT_CANCEL:
+                    cancelExperiment(messageContext);
+                                       break;
+                               default:
+                                       
experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+                                       log.error("Orchestrator got un-support 
message type : " + messageContext.getType());
+                                       break;
                        }
+               }
+
+               private void cancelExperiment(MessageContext messageContext) {
                        try {
                                byte[] bytes = 
ThriftUtils.serializeThriftObject(messageContext.getEvent());
                                ExperimentSubmitEvent expEvent = new 
ExperimentSubmitEvent();
                                ThriftUtils.createThriftFromBytes(bytes, 
expEvent);
-                               if (messageContext.isRedeliver()) {
-                    // TODO - handle redelivery scenario
-                    
experimentSubscriber.sendAck(messageContext.getDeliveryTag());
-                } else {
-                    launchExperiment(expEvent.getExperimentId(), 
expEvent.getGatewayId());
-                    
experimentSubscriber.sendAck(messageContext.getDeliveryTag());
-                }
+                               terminateExperiment(expEvent.getExperimentId(), 
expEvent.getGatewayId());
                        } catch (TException e) {
-                               log.error("Experiment launch failed due to 
Thrift conversion error", e);
-                experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+                               log.error("Experiment cancellation failed due 
to Thrift conversion error", e);
+                       }finally {
+                               
experimentSubscriber.sendAck(messageContext.getDeliveryTag());
                        }
+
+               }
+       }
+
+       private void launchExperiment(MessageContext messageContext) {
+               try {
+            byte[] bytes = 
ThriftUtils.serializeThriftObject(messageContext.getEvent());
+            ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent();
+            ThriftUtils.createThriftFromBytes(bytes, expEvent);
+            if (messageContext.isRedeliver()) {
+                               ExperimentModel experimentModel = 
(ExperimentModel) experimentCatalog.
+                                               
get(ExperimentCatalogModelType.EXPERIMENT, expEvent.getExperimentId());
+                               if 
(experimentModel.getExperimentStatus().getState() == ExperimentState.CREATED) {
+                                       
launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId());
+                               }
+            } else {
+                launchExperiment(expEvent.getExperimentId(), 
expEvent.getGatewayId());
+            }
+               } catch (TException e) {
+            log.error("Experiment launch failed due to Thrift conversion 
error", e);
+               } catch (RegistryException e) {
+                       log.error("Experiment launch failed due to registry 
access issue", e);
+               }finally {
+                       
experimentSubscriber.sendAck(messageContext.getDeliveryTag());
                }
        }
 

Reply via email to