Repository: airavata
Updated Branches:
  refs/heads/master 9e5356049 -> bea823b43


Experiment cancel request, Orchestrator side implementation and refactored 
zookeeper node paths. AIRAVATA-1798


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e1356b73
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e1356b73
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e1356b73

Branch: refs/heads/master
Commit: e1356b73aaab299552e5992fddce932e9505304b
Parents: 4ba16d6
Author: Shameera Rathanyaka <[email protected]>
Authored: Thu Sep 3 19:57:17 2015 -0400
Committer: Supun Nakandala <[email protected]>
Committed: Sat Sep 5 12:24:21 2015 +0530

----------------------------------------------------------------------
 .../airavata/common/utils/zkConstants.java      |  32 ++++
 .../airavata/gfac/core/GFacConstants.java       |   8 +-
 .../apache/airavata/gfac/core/GFacUtils.java    |  15 +-
 .../impl/watcher/CancelRequestWatcherImpl.java  |   8 +-
 .../watcher/RedeliveryRequestWatcherImpl.java   |   2 +
 .../airavata/gfac/server/GfacServerHandler.java |  67 +++++---
 .../server/OrchestratorServerHandler.java       | 151 ++++---------------
 7 files changed, 122 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/e1356b73/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java
----------------------------------------------------------------------
diff --git 
a/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java
 
b/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java
new file mode 100644
index 0000000..9255e02
--- /dev/null
+++ 
b/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.common.utils;
+
+public interface ZkConstants {
+
+       public static final String ZOOKEEPER_SERVERS_NODE = "/servers";
+       public static final String ZOOKEEPER_GFAC_SERVER_NODE = "/gfac";
+       public static final String ZOOKEEPER_EXPERIMENT_NODE = "/experiments";
+       public static final String ZOOKEEPER_DELIVERYTAG_NODE = "/deliveryTag";
+       public static final String ZOOKEEPER_TOKEN_NODE = "/token";
+       public static final String ZOOKEEPER_CANCEL_LISTENER_NODE = 
"/cancelListener";
+       public static final String ZOOKEEPER_CANCEL_REQEUST = "CANCEL_REQUEST";
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e1356b73/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
index b662fff..444956b 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
@@ -50,13 +50,7 @@ public class GFacConstants {
        public static final String _127_0_0_1 = "127.0.0.1";
        public static final String LOCALHOST = "localhost";
 
-       public static final String ZOOKEEPER_SERVERS_NODE = "/servers";
-       public static final String ZOOKEEPER_GFAC_SERVER_NODE = "/gfac";
-       public static final String ZOOKEEPER_EXPERIMENT_NODE = "/experiments";
-       public static final String ZOOKEEPER_DELIVERYTAG_NODE = "/deliveryTag";
-       public static final String ZOOKEEPER_TOKEN_NODE = "/token";
-       public static final String ZOOKEEPER_CANCEL_LISTENER_NODE = 
"/cancelListener";
-       public static final String ZOOKEEPER_CANCEL_REQEUST = "CANCEL_REQUEST";
+
 
        public static final String PROP_WORKFLOW_INSTANCE_ID = 
"workflow.instance.id";
        public static final String PROP_WORKFLOW_NODE_ID = "workflow.node.id";

http://git-wip-us.apache.org/repos/asf/airavata/blob/e1356b73/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index 3ee0461..d3d4c7e 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -25,6 +25,7 @@ import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.DBUtil;
 import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
 import org.apache.airavata.gfac.core.context.ProcessContext;
@@ -551,7 +552,7 @@ public class GFacUtils {
      * @throws InterruptedException
      */
     public static String findExperimentEntry(String experimentID, 
CuratorFramework curatorClient) throws Exception {
-        String experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE;
+        String experimentNode = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE;
         List<String> children = 
curatorClient.getChildren().forPath(experimentNode);
         for (String pickedChild : children) {
             String experimentPath = experimentNode + File.separator + 
pickedChild;
@@ -568,9 +569,9 @@ public class GFacUtils {
 
     public static boolean setExperimentCancelRequest(String processId, 
CuratorFramework curatorClient, long
                    deliveryTag) throws Exception {
-           String experimentNode = 
ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
-           String cancelListenerNodePath = ZKPaths.makePath(experimentNode, 
GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
-           
curatorClient.setData().withVersion(-1).forPath(cancelListenerNodePath, 
GFacConstants.ZOOKEEPER_CANCEL_REQEUST
+           String experimentNode = 
ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
+           String cancelListenerNodePath = ZKPaths.makePath(experimentNode, 
ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+           
curatorClient.setData().withVersion(-1).forPath(cancelListenerNodePath, 
ZkConstants.ZOOKEEPER_CANCEL_REQEUST
                            .getBytes());
            return true;
     }
@@ -768,7 +769,7 @@ public class GFacUtils {
 //    }
 
     public static String getZKGfacServersParentPath() {
-        return ZKPaths.makePath(GFacConstants.ZOOKEEPER_SERVERS_NODE, 
GFacConstants.ZOOKEEPER_GFAC_SERVER_NODE);
+        return ZKPaths.makePath(ZkConstants.ZOOKEEPER_SERVERS_NODE, 
ZkConstants.ZOOKEEPER_GFAC_SERVER_NODE);
     }
 
     public static JobDescriptor createJobDescriptor(ProcessContext 
processContext) throws GFacException, AppCatalogException, 
ApplicationSettingsException {
@@ -1113,11 +1114,11 @@ public class GFacUtils {
     }
 
        public static String getExperimentNodePath(String experimentId) {
-               return GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator 
+ experimentId;
+               return ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator + 
experimentId;
        }
 
        public static long getProcessDeliveryTag(CuratorFramework 
curatorClient, String processId) throws Exception {
-               String deliveryTagPath = 
GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + processId + GFacConstants
+               String deliveryTagPath = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE 
+ "/" + processId + ZkConstants
                                .ZOOKEEPER_DELIVERYTAG_NODE;
                byte[] bytes = curatorClient.getData().forPath(deliveryTagPath);
                return GFacUtils.bytesToLong(bytes);

http://git-wip-us.apache.org/repos/asf/airavata/blob/e1356b73/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
index bfeac89..58d2817 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
@@ -20,6 +20,7 @@
  */
 package org.apache.airavata.gfac.impl.watcher;
 
+import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
 import org.apache.airavata.gfac.impl.Factory;
@@ -44,7 +45,7 @@ public class CancelRequestWatcherImpl implements 
CancelRequestWatcher {
                                byte[] bytes = 
curatorClient.getData().forPath(path);
                                String processId = 
path.substring(path.lastIndexOf("/") + 1);
                                String action = new String(bytes);
-                               if (action.equalsIgnoreCase("CANCEL")) {
+                               if 
(action.equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST)) {
                                        ProcessContext processContext = 
Factory.getGfacContext().getProcess(processId);
                                        if (processContext != null) {
                                                processContext.setCancel(true);
@@ -56,9 +57,12 @@ public class CancelRequestWatcherImpl implements 
CancelRequestWatcher {
                                        
curatorClient.getData().usingWatcher(this).forPath(path);
                                }
                                break;
+                       case NodeDeleted:
+                               //end of experiment execution, ignore this event
+                               break;
                        case NodeCreated:
                        case NodeChildrenChanged:
-                       case NodeDeleted:
+                       case None:
                                
curatorClient.getData().usingWatcher(this).forPath(path);
                                break;
                        default:

http://git-wip-us.apache.org/repos/asf/airavata/blob/e1356b73/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
index 4738edb..dc5317f 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
@@ -58,6 +58,8 @@ public class RedeliveryRequestWatcherImpl implements 
RedeliveryRequestWatcher {
                                }
                                break;
                        case NodeDeleted:
+                               //end of experiment execution, ignore this event
+                               break;
                        case NodeCreated:
                        case NodeChildrenChanged:
                        case None:

http://git-wip-us.apache.org/repos/asf/airavata/blob/e1356b73/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
 
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 9ebfa05..1040b05 100644
--- 
a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ 
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -26,6 +26,7 @@ import 
org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
 import org.apache.airavata.gfac.core.GFacConstants;
 import org.apache.airavata.gfac.core.GFacException;
@@ -54,6 +55,7 @@ import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -116,7 +118,7 @@ public class GfacServerHandler implements GfacService.Iface 
{
         airavataServerHostPort = ServerSettings.getGfacServerHost() + ":" + 
ServerSettings.getGFacServerPort();
         // create PERSISTENT nodes
         ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
GFacUtils.getZKGfacServersParentPath());
-        ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
GFacConstants.ZOOKEEPER_EXPERIMENT_NODE);
+        ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
ZkConstants.ZOOKEEPER_EXPERIMENT_NODE);
         // create EPHEMERAL server name node
         String gfacName = ServerSettings.getGFacServerName();
         if 
(curatorClient.checkExists().forPath(ZKPaths.makePath(GFacUtils.getZKGfacServersParentPath()
 ,gfacName)) == null) {
@@ -196,7 +198,7 @@ public class GfacServerHandler implements GfacService.Iface 
{
         private String gfacServerName;
 
         public ProcessLaunchMessageHandler() throws 
ApplicationSettingsException {
-            experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE;
+            experimentNode = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE;
             gfacServerName = ServerSettings.getGFacServerName();
         }
 
@@ -226,8 +228,7 @@ public class GfacServerHandler implements GfacService.Iface 
{
                                if 
(Factory.getGfacContext().getProcess(event.getProcessId()) != null) {
                                        // update deliver tag
                                        try {
-                                               
updateDeliveryTag(curatorClient, gfacServerName, event.getProcessId(), message
-                                                               
.getDeliveryTag());
+                                               
updateDeliveryTag(curatorClient, gfacServerName, event, message );
                                                return;
                                        } catch (Exception e) {
                                                log.error("Error while updating 
delivery tag for redelivery message , messageId : " +
@@ -254,8 +255,7 @@ public class GfacServerHandler implements GfacService.Iface 
{
                                        .getProcessId());
                        publishProcessStatus(event, status);
                     try {
-                           createProcessZKNode(curatorClient, gfacServerName, 
event.getProcessId(), message
-                                           .getDeliveryTag(), 
event.getTokenId());
+                           createProcessZKNode(curatorClient, gfacServerName, 
event, message);
                            submitProcess(event.getProcessId(), 
event.getGatewayId(), event.getTokenId());
                     } catch (Exception e) {
                         log.error(e.getMessage(), e);
@@ -306,38 +306,55 @@ public class GfacServerHandler implements 
GfacService.Iface {
                statusPublisher.publish(msgCtx);
        }
 
-       private void createProcessZKNode(CuratorFramework curatorClient, String 
gfacServerName, String
-                       processId, long deliveryTag, String token) throws 
Exception {
-               // TODO - To handle multiple processes per experiment, need to 
create a /experiments/{expId}/{processId} node
-               // create /experiments/{processId} node and set data - 
serverName, add redelivery listener
-               String zkProcessNodePath = 
ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
+       private void createProcessZKNode(CuratorFramework curatorClient, String 
gfacServerName,ProcessSubmitEvent event
+                       ,MessageContext messageContext) throws Exception {
+               String processId  = event.getProcessId();
+               String token = event.getTokenId();
+               String experimentId = event.getExperimentId();
+               long deliveryTag = messageContext.getDeliveryTag();
+
+               // create /experiments//{experimentId}{processId} node and set 
data - serverName, add redelivery listener
+               String experimentNodePath = 
ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + experimentId;
+               String zkProcessNodePath = ZKPaths.makePath(experimentNodePath, 
processId);
                
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
zkProcessNodePath);
                
curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, 
gfacServerName.getBytes());
                
curatorClient.getData().usingWatcher(Factory.getRedeliveryReqeustWatcher()).forPath(zkProcessNodePath);
 
-               // create /experiments/{processId}/deliveryTag node and set 
data - deliveryTag
-               String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, 
GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE);
+               // create /experiments/{experimentId}/{processId}/deliveryTag 
node and set data - deliveryTag
+               String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, 
ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE);
                
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
deliveryTagPath);
                
curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, 
GFacUtils.longToBytes(deliveryTag));
 
-               // create /experiments/{processId}/token node and set data - 
token
-               String tokenNodePath = ZKPaths.makePath(processId, 
GFacConstants.ZOOKEEPER_TOKEN_NODE);
+               // create /experiments/{experimentId}/{processId}/token node 
and set data - token
+               String tokenNodePath = ZKPaths.makePath(zkProcessNodePath, 
ZkConstants.ZOOKEEPER_TOKEN_NODE);
                
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
tokenNodePath);
                curatorClient.setData().withVersion(-1).forPath(tokenNodePath, 
token.getBytes());
 
-               // create /experiments/{processId}/cancelListener node and set 
watcher for data changes
-               String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, 
GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+               // create 
/experiments/{experimentId}/{processId}/cancelListener node and set watcher for 
data changes
+/*             String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, 
GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
                
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
cancelListenerNode);
-               
curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath(cancelListenerNode);
+               
curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath(cancelListenerNode);*/
+
+               // create /experiments/{experimentId}/cancel node and set 
watcher for data changes
+               String experimentCancelNode = experimentNodePath + "/" + 
ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE;
+               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
experimentCancelNode);
+               
curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath 
(experimentCancelNode);
+
        }
 
-       private void updateDeliveryTag(CuratorFramework curatorClient, String 
gfacServerName, String processId, long
-                       deliveryTag) throws Exception {
-               // create /experiments/{processId} node and set data - 
serverName, add redelivery listener
-               String zkProcessNodePath = 
ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
-               // create /experiments/{processId}/deliveryTag node and set 
data - deliveryTag
-               String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, 
GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE);
-               
curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, 
GFacUtils.longToBytes(deliveryTag));
+       private void updateDeliveryTag(CuratorFramework curatorClient, String 
gfacServerName, ProcessSubmitEvent event,
+                                      MessageContext messageContext) throws 
Exception {
+               String experimentId = event.getExperimentId();
+               String processId = event.getProcessId();
+               long deliveryTag = messageContext.getDeliveryTag();
+               String processNodePath = 
ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE,
+                               experimentId), processId);
+               Stat stat = 
curatorClient.checkExists().forPath(processNodePath);
+               if (stat != null) {
+                       // create /experiments/{processId}/deliveryTag node and 
set data - deliveryTag
+                       String deliveryTagPath = 
ZKPaths.makePath(processNodePath, ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE);
+                       
curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, 
GFacUtils.longToBytes(deliveryTag));
+               }
        }
 
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e1356b73/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
 
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 8427d0c..acb5530 100644
--- 
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ 
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -26,6 +26,7 @@ import 
org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.scheduler.HostScheduler;
@@ -59,8 +60,14 @@ import 
org.apache.airavata.registry.core.app.catalog.resources.AppCatAbstractRes
 import 
org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import 
org.apache.airavata.registry.core.experiment.catalog.resources.AbstractExpCatResource;
 import org.apache.airavata.registry.cpi.*;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,6 +87,7 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
        private String gatewayName;
        private Publisher publisher;
        private RabbitMQStatusConsumer statusConsumer;
+       private CuratorFramework curatorClient;
 
     /**
         * Query orchestrator server to fetch the CPI version
@@ -109,6 +117,7 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
                        String exchangeName = 
ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
                        statusConsumer = new RabbitMQStatusConsumer(brokerUrl, 
exchangeName);
                        statusConsumer.listen(new ProcessStatusHandler());
+                       startCurator();
                } catch (OrchestratorException | RegistryException | 
AppCatalogException | AiravataException e) {
                        log.error(e.getMessage(), e);
                        throw new OrchestratorException("Error while 
initializing orchestrator service", e);
@@ -209,7 +218,12 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
         */
        public boolean terminateExperiment(String experimentId, String tokenId) 
throws TException {
         log.info(experimentId, "Experiment: {} is cancelling  !!!!!", 
experimentId);
-        return validateStatesAndCancel(experimentId, tokenId);
+               try {
+                       return validateStatesAndCancel(experimentId, tokenId);
+               } catch (Exception e) {
+                       log.error("expId : " + experimentId + " :- Error while 
cancelling experiment", e);
+                       return false;
+               }
        }
 
        private String getAiravataUserName() {
@@ -277,7 +291,7 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
         List<ComputeResourceDescription> computeHostList = 
Arrays.asList(deploymentMap.keySet().toArray(new 
ComputeResourceDescription[]{}));
         Class<? extends HostScheduler> aClass = Class.forName(
                 ServerSettings.getHostScheduler()).asSubclass(
-                HostScheduler.class);
+                       HostScheduler.class);
         HostScheduler hostScheduler = aClass.newInstance();
         ComputeResourceDescription ComputeResourceDescription = 
hostScheduler.schedule(computeHostList);
         return deploymentMap.get(ComputeResourceDescription);
@@ -297,124 +311,15 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
                return selectedModuleId;
        }
 
-    private boolean validateStatesAndCancel(String experimentId, String 
tokenId)throws TException{
-        // FIXME
-//        try {
-//            Experiment experiment = (Experiment) experimentCatalog.get(
-//                    ExperimentCatalogModelType.EXPERIMENT, experimentId);
-//                     log.info("Waiting for zookeeper to connect to the 
server");
-//                     synchronized (mutex){
-//                             mutex.wait(5000);
-//                     }
-//            if (experiment == null) {
-//                log.errorId(experimentId, "Error retrieving the Experiment 
by the given experimentID: {}.", experimentId);
-//                throw new OrchestratorException("Error retrieving the 
Experiment by the given experimentID: " + experimentId);
-//            }
-//            ExperimentState experimentState = 
experiment.getExperimentStatus().getExperimentState();
-//            if (isCancelValid(experimentState)){
-//                ExperimentStatus status = new ExperimentStatus();
-//                status.setExperimentState(ExperimentState.CANCELING);
-//                status.setTimeOfStateChange(Calendar.getInstance()
-//                        .getTimeInMillis());
-//                experiment.setExperimentStatus(status);
-//                
experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment,
-//                        experimentId);
-//
-//                List<String> ids = experimentCatalog.getIds(
-//                        ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL,
-//                        WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
-//                for (String workflowNodeId : ids) {
-//                    WorkflowNodeDetails workflowNodeDetail = 
(WorkflowNodeDetails) experimentCatalog
-//                            
.get(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL,
-//                                    workflowNodeId);
-//                    int value = 
workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().getValue();
-//                    if ( value> 1 && value < 7) { // we skip the unknown 
state
-//                        log.error(workflowNodeDetail.getNodeName() + " 
Workflow Node status cannot mark as cancelled, because " +
-//                                "current status is " + 
workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().toString());
-//                        continue; // this continue is very useful not to 
process deeper loops if the upper layers have non-cancel states
-//                    } else {
-//                        WorkflowNodeStatus workflowNodeStatus = new 
WorkflowNodeStatus();
-//                        
workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELING);
-//                        
workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
-//                                .getTimeInMillis());
-//                        
workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
-//                        
experimentCatalog.update(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL, 
workflowNodeDetail,
-//                                workflowNodeId);
-//                    }
-//                    List<Object> taskDetailList = experimentCatalog.get(
-//                            ExperimentCatalogModelType.TASK_DETAIL,
-//                            TaskDetailConstants.NODE_ID, workflowNodeId);
-//                    for (Object o : taskDetailList) {
-//                        TaskDetails taskDetails = (TaskDetails) o;
-//                        TaskStatus taskStatus = ((TaskDetails) 
o).getTaskStatus();
-//                        if (taskStatus.getExecutionState().getValue() > 7 && 
taskStatus.getExecutionState().getValue()<12) {
-//                            log.error(((TaskDetails) o).getTaskID() + " Task 
status cannot mark as cancelled, because " +
-//                                    "current task state is " + 
((TaskDetails) o).getTaskStatus().getExecutionState().toString());
-//                            continue;// this continue is very useful not to 
process deeper loops if the upper layers have non-cancel states
-//                        } else {
-//                            
taskStatus.setExecutionState(TaskState.CANCELING);
-//                            
taskStatus.setTimeOfStateChange(Calendar.getInstance()
-//                                    .getTimeInMillis());
-//                            taskDetails.setTaskStatus(taskStatus);
-//                            
experimentCatalog.update(ExperimentCatalogModelType.TASK_DETAIL, o,
-//                                    taskDetails.getTaskID());
-//                        }
-//                        orchestrator.cancelExperiment(experiment,
-//                                workflowNodeDetail, taskDetails, tokenId);
-//                        // Status update should be done at the monitor
-//                    }
-//                }
-//            }else {
-//                if (isCancelAllowed(experimentState)){
-//                    // when experiment status is < 3 no jobDetails object is 
created,
-//                    // so we don't have to worry, we simply have to change 
the status and stop the execution
-//                    ExperimentStatus status = new ExperimentStatus();
-//                    status.setExperimentState(ExperimentState.CANCELED);
-//                    status.setTimeOfStateChange(Calendar.getInstance()
-//                            .getTimeInMillis());
-//                    experiment.setExperimentStatus(status);
-//                    
experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment,
-//                            experimentId);
-//                    List<String> ids = experimentCatalog.getIds(
-//                            ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL,
-//                            WorkflowNodeConstants.EXPERIMENT_ID, 
experimentId);
-//                    for (String workflowNodeId : ids) {
-//                        WorkflowNodeDetails workflowNodeDetail = 
(WorkflowNodeDetails) experimentCatalog
-//                                
.get(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL,
-//                                        workflowNodeId);
-//                        WorkflowNodeStatus workflowNodeStatus = new 
WorkflowNodeStatus();
-//                        
workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED);
-//                        
workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
-//                                .getTimeInMillis());
-//                        
workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
-//                        
experimentCatalog.update(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL, 
workflowNodeDetail,
-//                                workflowNodeId);
-//                        List<Object> taskDetailList = experimentCatalog.get(
-//                                ExperimentCatalogModelType.TASK_DETAIL,
-//                                TaskDetailConstants.NODE_ID, workflowNodeId);
-//                        for (Object o : taskDetailList) {
-//                            TaskDetails taskDetails = (TaskDetails) o;
-//                            TaskStatus taskStatus = ((TaskDetails) 
o).getTaskStatus();
-//                            taskStatus.setExecutionState(TaskState.CANCELED);
-//                            
taskStatus.setTimeOfStateChange(Calendar.getInstance()
-//                                    .getTimeInMillis());
-//                            taskDetails.setTaskStatus(taskStatus);
-//                            
experimentCatalog.update(ExperimentCatalogModelType.TASK_DETAIL, o,
-//                                    taskDetails);
-//                        }
-//                    }
-//                }else {
-//                    log.errorId(experimentId, "Unable to mark experiment as 
Cancelled, current state {} doesn't allow to cancel the experiment {}.",
-//                            
experiment.getExperimentStatus().getExperimentState().toString(), experimentId);
-//                    throw new OrchestratorException("Unable to mark 
experiment as Cancelled, because current state is: "
-//                            + 
experiment.getExperimentStatus().getExperimentState().toString());
-//                }
-//            }
-//            log.info("Experiment: " + experimentId + " is cancelled !!!!!");
-//        } catch (Exception e) {
-//            throw new TException(e);
-//        }
-        return true;
+    private boolean validateStatesAndCancel(String experimentId, String 
tokenId) throws Exception {
+           String expCancelNodePath = 
ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE,
+                           experimentId), 
ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+           Stat stat = curatorClient.checkExists().forPath(expCancelNodePath);
+           if (stat != null) {
+                   
curatorClient.setData().withVersion(-1).forPath(expCancelNodePath, 
ZkConstants.ZOOKEEPER_CANCEL_REQEUST
+                                   .getBytes());
+           }
+           return true;
     }
 
     private void launchWorkflowExperiment(String experimentId, String 
airavataCredStoreToken) throws TException {
@@ -427,6 +332,12 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
 //        }
     }
 
+       private void startCurator() throws ApplicationSettingsException {
+               String connectionSting = 
ServerSettings.getZookeeperConnection();
+               RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
+               curatorClient = 
CuratorFrameworkFactory.newClient(connectionSting, retryPolicy);
+               curatorClient.start();
+       }
     private class SingleAppExperimentRunner implements Runnable {
 
         String experimentId;

Reply via email to