Repository: airavata
Updated Branches:
  refs/heads/master 749ad9330 -> ac3be7ae4


Handled TerminatTask and send ack to rabbitMQ


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/ac3be7ae
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/ac3be7ae
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/ac3be7ae

Branch: refs/heads/master
Commit: ac3be7ae4f057876641fcf90725566b02bb1e312
Parents: 749ad93
Author: Shameera Rathanyaka <[email protected]>
Authored: Wed Jun 17 17:22:32 2015 -0400
Committer: Shameera Rathanyaka <[email protected]>
Committed: Wed Jun 17 17:22:32 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/core/GFacConstants.java       |   2 +
 .../apache/airavata/gfac/core/GFacUtils.java    |  59 ++++++----
 .../airavata/gfac/server/GfacServerHandler.java | 108 ++++++-------------
 3 files changed, 72 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/ac3be7ae/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
index 2706f27..b662fff 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java
@@ -54,7 +54,9 @@ public class GFacConstants {
        public static final String ZOOKEEPER_GFAC_SERVER_NODE = "/gfac";
        public static final String ZOOKEEPER_EXPERIMENT_NODE = "/experiments";
        public static final String ZOOKEEPER_DELIVERYTAG_NODE = "/deliveryTag";
+       public static final String ZOOKEEPER_TOKEN_NODE = "/token";
        public static final String ZOOKEEPER_CANCEL_LISTENER_NODE = 
"/cancelListener";
+       public static final String ZOOKEEPER_CANCEL_REQEUST = "CANCEL_REQUEST";
 
        public static final String PROP_WORKFLOW_INSTANCE_ID = 
"workflow.instance.id";
        public static final String PROP_WORKFLOW_NODE_ID = "workflow.node.id";

http://git-wip-us.apache.org/repos/asf/airavata/blob/ac3be7ae/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index 63cc093..e630570 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -28,6 +28,8 @@ import 
org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
+import org.apache.airavata.gfac.core.watcher.RedeliveryRequestWatcher;
 import 
org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
 import 
org.apache.airavata.model.appcatalog.appdeployment.ApplicationParallelismType;
 import org.apache.airavata.model.appcatalog.computeresource.*;
@@ -497,26 +499,13 @@ public class GFacUtils {
         return null;
     }
 
-    public static boolean setExperimentCancel(String experimentId, 
CuratorFramework curatorClient, long deliveryTag) throws Exception {
-        String experimentEntry = GFacUtils.findExperimentEntry(experimentId, 
curatorClient);
-        if (experimentEntry == null) {
-            // This should be handle in validation request. Gfac shouldn't get 
any invalidate experiment.
-            log.error("Cannot find the experiment Entry, so cancel operation 
cannot be performed. " +
-                    "This happen when experiment completed and already removed 
from the zookeeper");
-            return false;
-        } else {
-            // check cancel operation is being processed for the same 
experiment.
-            Stat cancelState = 
curatorClient.checkExists().forPath(experimentEntry + 
AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
-            if (cancelState != null) {
-                // another cancel operation is being processed. only one 
cancel operation can exist for a given experiment.
-                return false;
-            }
-
-            
curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-                    .forPath(experimentEntry + 
AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag)); // save 
cancel delivery tag to be acknowledge at the end.
-            return true;
-        }
-
+    public static boolean setExperimentCancelRequest(String experimentId, 
CuratorFramework curatorClient, long
+                   deliveryTag) throws Exception {
+           String experimentNode = 
ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId);
+           String cancelListenerNodePath = ZKPaths.makePath(experimentNode, 
GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+           
curatorClient.setData().withVersion(-1).forPath(cancelListenerNodePath, 
GFacConstants.ZOOKEEPER_CANCEL_REQEUST
+                           .getBytes());
+           return true;
     }
 
     public static boolean isCancelled(String experimentID, CuratorFramework 
curatorClient) throws Exception {
@@ -712,7 +701,7 @@ public class GFacUtils {
 //    }
 
     public static String getZKGfacServersParentPath() {
-        return GFacConstants.ZOOKEEPER_SERVERS_NODE + 
GFacConstants.ZOOKEEPER_GFAC_SERVER_NODE;
+        return ZKPaths.makePath(GFacConstants.ZOOKEEPER_SERVERS_NODE, 
GFacConstants.ZOOKEEPER_GFAC_SERVER_NODE);
     }
 
     public static JobDescriptor createJobDescriptor(ProcessContext 
processContext) throws GFacException, AppCatalogException, 
ApplicationSettingsException {
@@ -1033,4 +1022,32 @@ public class GFacUtils {
             throw new GFacException("Error occurred while creating the temp 
job script file", e);
         }
     }
+
+       public static String getExperimentNodePath(String experimentId) {
+               return GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator 
+ experimentId;
+       }
+
+       public static void createExperimentNode(CuratorFramework curatorClient, 
String gfacServerName, String
+                       experimentId, long deliveryTag, String token) throws 
Exception {
+               // create /experiments/experimentId node and set data - 
serverName, add redelivery listener
+               String experimentPath = 
ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId);
+               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
experimentPath);
+               curatorClient.setData().withVersion(-1).forPath(experimentPath, 
gfacServerName.getBytes());
+               curatorClient.getData().usingWatcher(new 
RedeliveryRequestWatcher()).forPath(experimentPath);
+
+               // create /experiments/experimentId/deliveryTag node and set 
data - deliveryTag
+               String deliveryTagPath = ZKPaths.makePath(experimentPath, 
GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE);
+               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
deliveryTagPath);
+               
curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, 
GFacUtils.longToBytes(deliveryTag));
+
+               // create /experiments/experimentId/token node and set data - 
token
+               String tokenNodePath = ZKPaths.makePath(experimentId, 
GFacConstants.ZOOKEEPER_TOKEN_NODE);
+               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
tokenNodePath);
+               curatorClient.setData().withVersion(-1).forPath(tokenNodePath, 
token.getBytes());
+
+               // create /experiments/experimentId/cancelListener node and set 
watcher for data changes
+               String cancelListenerNode = ZKPaths.makePath(experimentPath, 
GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
cancelListenerNode);
+               curatorClient.getData().usingWatcher(new 
CancelRequestWatcher()).forPath(cancelListenerNode);
+       }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/ac3be7ae/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
 
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index e8037a4..a4a4874 100644
--- 
a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ 
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -28,13 +28,11 @@ import org.apache.airavata.common.utils.LocalEventPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
-import org.apache.airavata.gfac.core.GFac;
 import org.apache.airavata.gfac.core.GFacConstants;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.cpi.GfacService;
 import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
-import org.apache.airavata.gfac.impl.BetterGfacImpl;
 import org.apache.airavata.gfac.impl.Factory;
 import org.apache.airavata.gfac.impl.GFacWorker;
 import org.apache.airavata.messaging.core.MessageContext;
@@ -119,15 +117,14 @@ public class GfacServerHandler implements 
GfacService.Iface {
         ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
GFacConstants.ZOOKEEPER_EXPERIMENT_NODE);
         // create EPHEMERAL server name node
         String gfacName = ServerSettings.getGFacServerName();
-        if 
(curatorClient.checkExists().forPath(GFacUtils.getZKGfacServersParentPath() + 
(gfacName.startsWith("/") ?
-                gfacName : "/" + gfacName)) == null) {
-            
curatorClient.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
-                    .forPath(GFacUtils.getZKGfacServersParentPath() + 
(gfacName.startsWith("/") ? gfacName : "/" +
-                            gfacName));
+        if 
(curatorClient.checkExists().forPath(ZKPaths.makePath(GFacUtils.getZKGfacServersParentPath()
 ,gfacName)) == null) {
+               
curatorClient.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
+                               
.forPath(ZKPaths.makePath(GFacUtils.getZKGfacServersParentPath(), gfacName));
 
         }
-        
curatorClient.setData().withVersion(-1).forPath(GFacUtils.getZKGfacServersParentPath()
 +
-                (gfacName.startsWith("/") ? gfacName : "/" + gfacName), new 
String(airavataServerHostPort).getBytes());
+           
curatorClient.setData().withVersion(-1).forPath(ZKPaths.makePath(GFacUtils.getZKGfacServersParentPath(),
+                           gfacName), airavataServerHostPort.getBytes());
+
     }
 
     public String getGFACServiceVersion() throws TException {
@@ -159,16 +156,18 @@ public class GfacServerHandler implements 
GfacService.Iface {
                 processId);
 
         try {
-               executorService.execute(new GFacWorker(experimentId,processId, 
gatewayId, tokenId));
+               executorService.execute(new GFacWorker(experimentId, processId, 
gatewayId, tokenId));
         } catch (GFacException e) {
             log.error("Failed to submit process", e);
             return false;
+        } catch (Exception e) {
+               log.error("Error creating zookeeper nodes");
         }
            return true;
     }
 
     public boolean cancelJob(String experimentId, String taskId, String 
gatewayId, String tokenId) throws TException {
-        log.info(experimentId, "GFac Received cancel job request for 
Experiment: {} TaskId: {} ", experimentId, taskId);
+    /*    log.info(experimentId, "GFac Received cancel job request for 
Experiment: {} TaskId: {} ", experimentId, taskId);
         try {
             if (BetterGfacImpl.getInstance().cancel(experimentId, taskId, 
gatewayId, tokenId)) {
                 log.debug(experimentId, "Successfully cancelled job, 
experiment {} , task {}", experimentId, taskId);
@@ -180,22 +179,17 @@ public class GfacServerHandler implements 
GfacService.Iface {
         } catch (Exception e) {
             log.error(experimentId, "Error cancelling the experiment {}.", 
experimentId);
             throw new TException("Error cancelling the experiment : " + 
e.getMessage(), e);
-        }
+        }*/
+           return false;
     }
 
-    public ExperimentCatalog getExperimentCatalog() {
-        return experimentCatalog;
-    }
 
-    public void setExperimentCatalog(ExperimentCatalog experimentCatalog) {
-        this.experimentCatalog = experimentCatalog;
-    }
 
 
     public static void startStatusUpdators(ExperimentCatalog 
experimentCatalog, CuratorFramework curatorClient, LocalEventPublisher 
publisher,
 
                                            RabbitMQTaskLaunchConsumer 
rabbitMQTaskLaunchConsumer) {
-        try {
+       /* try {
             String[] listenerClassList = ServerSettings.getActivityListeners();
             Publisher rabbitMQPublisher = 
PublisherFactory.createActivityPublisher();
             for (String listenerClass : listenerClassList) {
@@ -208,42 +202,16 @@ public class GfacServerHandler implements 
GfacService.Iface {
             }
         } catch (Exception e) {
             log.error("Error loading the listener classes configured in 
airavata-server.properties", e);
-        }
-    }
-    private static  class TestHandler implements MessageHandler{
-        @Override
-        public Map<String, Object> getProperties() {
-            Map<String, Object> props = new HashMap<String, Object>();
-            ArrayList<String> keys = new ArrayList<String>();
-            keys.add(ServerSettings.getLaunchQueueName());
-            keys.add(ServerSettings.getCancelQueueName());
-            props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys);
-            props.put(MessagingConstants.RABBIT_QUEUE, 
ServerSettings.getLaunchQueueName());
-            return props;
-        }
-
-        @Override
-        public void onMessage(MessageContext message) {
-            TaskSubmitEvent event = new TaskSubmitEvent();
-            TBase messageEvent = message.getEvent();
-            byte[] bytes = new byte[0];
-            try {
-                bytes = ThriftUtils.serializeThriftObject(messageEvent);
-                ThriftUtils.createThriftFromBytes(bytes, event);
-                System.out.println(event.getExperimentId());
-            } catch (TException e) {
-                log.error(e.getMessage(), e);
-            }
-        }
+        }*/
     }
 
     private class TaskLaunchMessageHandler implements MessageHandler {
         private String experimentNode;
-        private String nodeName;
+        private String gfacServerName;
 
         public TaskLaunchMessageHandler() throws ApplicationSettingsException {
             experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE;
-            nodeName = ServerSettings.getGFacServerName();
+            gfacServerName = ServerSettings.getGFacServerName();
         }
 
         public Map<String, Object> getProperties() {
@@ -271,9 +239,8 @@ public class GfacServerHandler implements GfacService.Iface 
{
                     
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
                     
experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT_STATUS, status, 
event.getExperimentId());
                     try {
-                        
GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), 
event.getTaskId(), curatorClient,
-                                experimentNode, nodeName, event.getTokenId(), 
message.getDeliveryTag());
-                        
AiravataZKUtils.getExpStatePath(event.getExperimentId());
+                           GFacUtils.createExperimentNode(curatorClient, 
gfacServerName, event.getExperimentId(), message.getDeliveryTag(),
+                                           event.getTokenId());
                         submitJob(event.getExperimentId(), event.getTaskId(), 
event.getGatewayId(), event.getTokenId());
                     } catch (Exception e) {
                         log.error(e.getMessage(), e);
@@ -285,38 +252,27 @@ public class GfacServerHandler implements 
GfacService.Iface {
                     log.error("Error while updating experiment status", e);
                 }
             } else if (message.getType().equals(MessageType.TERMINATETASK)) {
-                boolean cancelSuccess = false;
                 TaskTerminateEvent event = new TaskTerminateEvent();
                 TBase messageEvent = message.getEvent();
                 try {
                     byte[] bytes = 
ThriftUtils.serializeThriftObject(messageEvent);
                     ThriftUtils.createThriftFromBytes(bytes, event);
-                    boolean saveDeliveryTagSuccess = 
GFacUtils.setExperimentCancel(event.getExperimentId(), curatorClient, 
message.getDeliveryTag());
-                    if (saveDeliveryTagSuccess) {
-                        cancelSuccess = cancelJob(event.getExperimentId(), 
event.getTaskId(), event.getGatewayId(), event.getTokenId());
-                        System.out.println(" Message Received with message id 
'" + message.getMessageId()
-                                + "' and with message type '" + 
message.getType());
-                    } else {
-                        throw new GFacException("Terminate Task fail to save 
delivery tag : " + String.valueOf(message.getDeliveryTag()) + " \n" +
-                                "This happens when another cancel operation is 
being processed or experiment is in one of final states, 
complete|failed|cancelled.");
-                    }
+                       boolean success = 
GFacUtils.setExperimentCancelRequest(event.getExperimentId(), curatorClient,
+                                       message.getDeliveryTag());
+                       if (success) {
+                               log.info("expId:{} - Experiment cancel request 
save successfully", event.getExperimentId());
+                       }
                 } catch (Exception e) {
-                    log.error(e.getMessage(), e);
+                       log.error("expId:" + event.getExperimentId() + " - 
Experiment cancel reqeust failed", e);
                 }finally {
-                    if (cancelSuccess) {
-                        // if cancel success , AiravataExperimentStatusUpdator 
will send an ack to this message.
-                    } else {
-                        try {
-                            if 
(GFacUtils.ackCancelRequest(event.getExperimentId(), curatorClient)) {
-                                if (!rabbitMQTaskLaunchConsumer.isOpen()) {
-                                    rabbitMQTaskLaunchConsumer.reconnect();
-                                }
-                                
rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
-                            }
-                        } catch (Exception e) {
-                            log.error("Error while ack to cancel request, 
experimentId: " + event.getExperimentId());
-                        }
-                    }
+                       try {
+                               if (!rabbitMQTaskLaunchConsumer.isOpen()) {
+                                       rabbitMQTaskLaunchConsumer.reconnect();
+                               }
+                               
rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+                       } catch (AiravataException e) {
+                               log.error("expId: " + event.getExperimentId() + 
" - Failed to send acknowledgement back to cancel request.", e);
+                       }
                 }
             }
         }

Reply via email to