adding support to proper acking for messages

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/1231c014
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/1231c014
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/1231c014

Branch: refs/heads/queue-gfac-rabbitmq
Commit: 1231c014bebd1d23c2bdd340b7d721abe279d45a
Parents: ffbb1b9
Author: Lahiru Gunathilake <[email protected]>
Authored: Wed Feb 25 00:59:09 2015 -0500
Committer: Lahiru Gunathilake <[email protected]>
Committed: Wed Feb 25 00:59:09 2015 -0500

----------------------------------------------------------------------
 .../airavata/api/server/AiravataAPIServer.java  |  1 +
 .../client/samples/CreateLaunchExperiment.java  | 23 +++--
 .../airavata/gfac/server/GfacServerHandler.java | 48 +++++++++--
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |  1 +
 .../core/monitor/GfacInternalStatusUpdator.java |  3 +
 .../airavata/gfac/core/utils/GFacUtils.java     | 21 +++--
 .../handlers/GridPullMonitorHandler.java        |  1 +
 .../messaging/client/RabbitMQListner.java       |  4 +-
 .../airavata/messaging/core/MessageContext.java | 17 ++++
 .../core/impl/RabbitMQTaskLaunchConsumer.java   | 10 ++-
 .../server/OrchestratorServerHandler.java       | 90 ++++++++++----------
 .../util/OrchestratorRecoveryHandler.java       |  1 +
 .../core/impl/GFACPassiveJobSubmitter.java      | 10 +--
 13 files changed, 151 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java
----------------------------------------------------------------------
diff --git 
a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java
 
b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java
index 0e6da90..da42ce0 100644
--- 
a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java
+++ 
b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java
@@ -299,6 +299,7 @@ public class AiravataAPIServer implements IServer, Watcher{
 
     @Override
     synchronized public void process(WatchedEvent watchedEvent) {
+        logger.info(watchedEvent.getPath());
         synchronized (mutex) {
             Event.KeeperState state = watchedEvent.getState();
             logger.info(state.name());

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git 
a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
 
b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index c4c303f..78c2d71 100644
--- 
a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ 
b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -47,17 +47,17 @@ import java.util.*;
 public class CreateLaunchExperiment {
 
     //FIXME: Read from a config file
-//    public static final String THRIFT_SERVER_HOST = "localhost";
-//    public static final int THRIFT_SERVER_PORT = 8930;
-       public static final String THRIFT_SERVER_HOST = "gw111.iu.xsede.org";
-       public static final int THRIFT_SERVER_PORT = 9930;
+    public static final String THRIFT_SERVER_HOST = "localhost";
+    public static final int THRIFT_SERVER_PORT = 8930;
+//     public static final String THRIFT_SERVER_HOST = "gw111.iu.xsede.org";
+//     public static final int THRIFT_SERVER_PORT = 9930;
        
     private final static Logger logger = 
LoggerFactory.getLogger(CreateLaunchExperiment.class);
     private static final String DEFAULT_USER = "default.registry.user";
     private static final String DEFAULT_GATEWAY = "default.registry.gateway";
     private static Airavata.Client airavataClient;
 
-    private static String echoAppId = 
"Echo_a8fc8511-7b8e-431a-ad0f-de5eb1a9c576";
+    private static String echoAppId = 
"Echo_1365a7fd-eae1-4575-b447-99afb4d79c82";
     private static String mpiAppId = 
"HelloMPI_720e159f-198f-4daa-96ca-9f5eafee92c9";
     private static String wrfAppId = 
"WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
     private static String amberAppId = 
"Amber_42124128-628b-484c-829d-aff8b584eb00";
@@ -93,7 +93,7 @@ public class CreateLaunchExperiment {
 //        final String expId = createEchoExperimentForFSD(airavataClient);
         List<String> experimentIds = new ArrayList<String>();
         try {
-            for (int i = 0; i < 100; i++) {
+            for (int i = 0; i < 1; i++) {
 //                final String expId = createExperimentForSSHHost(airavata);
 //                final String expId = 
createEchoExperimentForFSD(airavataClient);
 //                final String expId = 
createMPIExperimentForFSD(airavataClient);
@@ -120,12 +120,11 @@ public class CreateLaunchExperiment {
                 launchExperiment(airavataClient, expId);
             }
 
-            Thread.sleep(10000);
-
-            for(String exId:experimentIds) {
-                Experiment experiment = airavataClient.getExperiment(exId);
-                
System.out.println(experiment.getExperimentStatus().toString());
-            }
+            Thread.sleep(100);
+                for (String exId : experimentIds) {
+                    Experiment experiment = airavataClient.getExperiment(exId);
+                    
System.out.println(experiment.getExperimentStatus().toString());
+                }
 
 
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
 
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 1c0f095..cca793e 100644
--- 
a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ 
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -65,8 +65,6 @@ public class GfacServerHandler implements GfacService.Iface, 
Watcher {
     private Registry registry;
     private AppCatalog appCatalog;
 
-    private String registryURL;
-
     private String gatewayName;
 
     private String airavataUserName;
@@ -144,12 +142,13 @@ public class GfacServerHandler implements 
GfacService.Iface, Watcher {
                     CreateMode.PERSISTENT);
         }
         String instanceId = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
-        String instantNode = gfacServer + File.separator + instanceId;
-        zkStat = zk.exists(instantNode, true);
+        String instanceNode = gfacServer + File.separator + instanceId;
+        zkStat = zk.exists(instanceNode, true);
         if (zkStat == null) {
-            zk.create(instantNode,
+            zk.create(instanceNode,
                     airavataServerHostPort.getBytes(), 
ZooDefs.Ids.OPEN_ACL_UNSAFE,
                     CreateMode.EPHEMERAL);      // other component will watch 
these childeren creation deletion to monitor the status of the node
+            zk.getChildren(instanceNode, true);
         }
         zkStat = zk.exists(gfacExperiments, false);
         if (zkStat == null) {
@@ -168,6 +167,8 @@ public class GfacServerHandler implements 
GfacService.Iface, Watcher {
     }
 
     synchronized public void process(WatchedEvent watchedEvent) {
+        logger.info(watchedEvent.getPath());
+        logger.info(watchedEvent.getType().toString());
         synchronized (mutex) {
             Event.KeeperState state = watchedEvent.getState();
             logger.info(state.name());
@@ -191,10 +192,39 @@ public class GfacServerHandler implements 
GfacService.Iface, Watcher {
                 } catch (KeeperException e) {
                     logger.error(e.getMessage(), e);
                 }
+            } else if 
(Event.EventType.NodeDeleted.equals(watchedEvent.getType())) {
+                String path = watchedEvent.getPath();
+                String experimentNode = 
ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,
 "/gfac-experiments");
+                if (path.startsWith(experimentNode)) {
+                    // we got a watch when experiment is removed
+                    String deliveryPath = path + 
GFacUtils.DELIVERY_TAG_POSTFIX;
+                    try {
+                        Stat exists = zk.exists(deliveryPath, false);
+                        byte[] data = zk.getData(path + 
GFacUtils.DELIVERY_TAG_POSTFIX, false, exists);
+                        long value = ByateArrayToLong(data);
+                        logger.info("ExperimentId+taskId" + path);
+                        logger.info("Sending Ack back to the Queue, because 
task is over");
+                        rabbitMQTaskLaunchConsumer.sendAck(value);
+                        ZKUtil.deleteRecursive(zk,deliveryPath);
+                    } catch (KeeperException e) {
+                        logger.error(e.getMessage(), e);
+                    } catch (InterruptedException e) {
+                        logger.error(e.getMessage(), e);
+                    }
+                }
             }
         }
     }
 
+    private long ByateArrayToLong(byte[] data) {
+        long value = 0;
+        for (int i = 0; i < data.length; i++)
+        {
+            value += ((long) data[i] & 0xffL) << (8 * i);
+        }
+        return value;
+    }
+
     public String getGFACServiceVersion() throws TException {
         return gfac_cpi_serviceConstants.GFAC_CPI_VERSION;
     }
@@ -314,12 +344,18 @@ public class GfacServerHandler implements 
GfacService.Iface, Watcher {
                     experimentNode = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, 
"/gfac-experiments");
 
                     try {
-                        
GFacUtils.createExperimentEntryForRPC(event.getExperimentId(), 
event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId());
+                        
GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), 
event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), 
message.getDeliveryTag());
+                        
AiravataZKUtils.getExpStatePath(event.getExperimentId(),event.getTaskId());
                         submitJob(event.getExperimentId(), event.getTaskId(), 
event.getGatewayId());
                     } catch (KeeperException e) {
                         logger.error(nodeName + " was interrupted.");
+                        
rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
                     } catch (InterruptedException e) {
                         logger.error(e.getMessage(), e);
+                        
rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+                    } catch (ApplicationSettingsException e) {
+                        logger.error(e.getMessage(), e);
+                        
rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
                     }
                     System.out.println(" Message Received with message id '" + 
message.getMessageId()
                             + "' and with message type '" + message.getType());

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index bb612a6..00930e5 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -1158,6 +1158,7 @@ public class BetterGfacImpl implements GFac,Watcher {
     }
 
     public void process(WatchedEvent watchedEvent) {
+        log.info(watchedEvent.getPath());
         if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){
             // node data is changed, this means node is cancelled.
             log.info("Experiment is cancelled with this 
path:"+watchedEvent.getPath());

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 7818da0..26902e7 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -29,6 +29,7 @@ import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
 import 
org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.zookeeper.*;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -49,6 +50,7 @@ public class GfacInternalStatusUpdator implements 
AbstractActivityListener, Watc
         MonitorID monitorID = statusChangeRequest.getMonitorID();
         String experimentPath = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, 
"/gfac-experiments") +
                 File.separator + 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME) + 
File.separator + statusChangeRequest.getMonitorID().getExperimentID() + "+" + 
monitorID.getTaskID();
+        String deliveryTagPath = experimentPath + 
GFacUtils.DELIVERY_TAG_POSTFIX;
         Stat exists = null;
         try {
             if (!zk.getState().isConnected()) {
@@ -107,6 +109,7 @@ public class GfacInternalStatusUpdator implements 
AbstractActivityListener, Watc
     }
 
     public void process(WatchedEvent watchedEvent) {
+        logger.info(watchedEvent.getPath());
         synchronized (mutex) {
             Event.KeeperState state = watchedEvent.getState();
             if (state == Event.KeeperState.SyncConnected) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 9f104fa..c825ffd 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -60,12 +60,14 @@ import java.io.*;
 import java.net.InetAddress;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 import java.util.*;
 
 //import org.apache.airavata.commons.gfac.type.ActualParameter;
 
 public class GFacUtils {
        private final static Logger log = 
LoggerFactory.getLogger(GFacUtils.class);
+       public static final String DELIVERY_TAG_POSTFIX = "-deliveryTag";
 
        private GFacUtils() {
        }
@@ -1156,7 +1158,7 @@ public class GFacUtils {
        // This method is dangerous because of moving the experiment data
        public static boolean createExperimentEntryForPassive(String 
experimentID,
                                                                                
                          String taskID, ZooKeeper zk, String experimentNode,
-                                                                               
                          String pickedChild, String tokenId) throws 
KeeperException,
+                                                                               
                          String pickedChild, String tokenId,long deliveryTag) 
throws KeeperException,
                        InterruptedException {
                String experimentPath = experimentNode + File.separator + 
pickedChild;
                String newExpNode = experimentPath + File.separator + 
experimentID
@@ -1165,15 +1167,14 @@ public class GFacUtils {
                String experimentEntry = 
GFacUtils.findExperimentEntry(experimentID, taskID, zk);
                String foundExperimentPath = null;
                if (exists1 == null && experimentEntry == null) {  // this 
means this is a very new experiment
-                       List<String> runningGfacNodeNames = AiravataZKUtils
-                                       .getAllGfacNodeNames(zk); // here we 
take old gfac servers
-                       // too
+                       List<String> runningGfacNodeNames = 
AiravataZKUtils.getAllGfacNodeNames(zk); // here we take old gfac servers
+
                        for (String gfacServerNode : runningGfacNodeNames) {
                                if (!gfacServerNode.equals(pickedChild)) {
                                        foundExperimentPath = experimentNode + 
File.separator
                                                        + gfacServerNode + 
File.separator + experimentID
                                                        + "+" + taskID;
-                                       exists1 = 
zk.exists(foundExperimentPath, false);
+                                       exists1 = 
zk.exists(foundExperimentPath, true);
                                        if (exists1 != null) { // when the 
experiment is found we
                                                // break the loop
                                                break;
@@ -1183,21 +1184,23 @@ public class GFacUtils {
                        if (exists1 == null) { // OK this is a pretty new 
experiment so we
                                // are going to create a new node
                                log.info("This is a new Job, so creating all 
the experiment docs from the scratch");
+                               Stat expParent = zk.exists(newExpNode, false);
                                zk.create(newExpNode, new byte[0], 
ZooDefs.Ids.OPEN_ACL_UNSAFE,
                                                CreateMode.PERSISTENT);
 
-                               Stat expParent = zk.exists(newExpNode, false);
                                if (tokenId != null && expParent != null) {
                                        zk.setData(newExpNode, 
tokenId.getBytes(),
                                                        expParent.getVersion());
                                }
-                               zk.create(newExpNode + File.separator + 
"state", String
+                               String s = zk.create(newExpNode + 
File.separator + "state", String
                                                                
.valueOf(GfacExperimentState.LAUNCHED.getValue())
                                                                .getBytes(), 
ZooDefs.Ids.OPEN_ACL_UNSAFE,
                                                CreateMode.PERSISTENT);
-                               zk.create(newExpNode + File.separator + 
"operation","submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                               String s1 = zk.create(newExpNode + 
File.separator + "operation", "submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                                               CreateMode.PERSISTENT);
+                               zk.exists(s1, true);// we want to know when 
this node get deleted
+                               String s2 = zk.create(newExpNode + 
DELIVERY_TAG_POSTFIX, ByteBuffer.allocate(8).putLong(deliveryTag).array(), 
ZooDefs.Ids.OPEN_ACL_UNSAFE,  // here we store the value of delivery message
                                                CreateMode.PERSISTENT);
-
                        } else {
                                // ohhh this node exists in some other failed 
gfac folder, we
                                // have to move it to this gfac experiment 
list,safely

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
index e64f596..d5f9f90 100644
--- 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -125,6 +125,7 @@ public class GridPullMonitorHandler extends ThreadedHandler 
implements Watcher{
 
 
     public void process(WatchedEvent watchedEvent) {
+        logger.info(watchedEvent.getPath());
         if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){
             // node data is changed, this means node is cancelled.
             logger.info("Experiment is cancelled with this 
path:"+watchedEvent.getPath());

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
----------------------------------------------------------------------
diff --git 
a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
 
b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
index 601497a..48edbe8 100644
--- 
a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
+++ 
b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
@@ -28,7 +28,7 @@ import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.impl.RabbitMQConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
 import org.apache.airavata.model.messaging.event.*;
 import org.apache.airavata.model.workspace.experiment.ExperimentState;
 import org.apache.commons.cli.*;
@@ -67,7 +67,7 @@ public class RabbitMQListner {
             String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
             System.out.println("broker url " + brokerUrl);
             final String exchangeName = 
ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
-            RabbitMQConsumer consumer = new RabbitMQConsumer(brokerUrl, 
exchangeName);
+            RabbitMQStatusConsumer consumer = new 
RabbitMQStatusConsumer(brokerUrl, exchangeName);
             consumer.listen(new MessageHandler() {
                 @Override
                 public Map<String, Object> getProperties() {

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
----------------------------------------------------------------------
diff --git 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
index 0a39d92..272f413 100644
--- 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
+++ 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
@@ -32,6 +32,7 @@ public class MessageContext {
     private final String messageId;
     private final String gatewayId;
     private Timestamp updatedTime;
+    private long deliveryTag;
 
 
     public MessageContext(TBase message, MessageType type, String messageId, 
String gatewayId) {
@@ -41,6 +42,14 @@ public class MessageContext {
         this.gatewayId = gatewayId;
     }
 
+    public MessageContext(TBase event, MessageType type, String messageId, 
String gatewayId, long deliveryTag) {
+        this.event = event;
+        this.type = type;
+        this.messageId = messageId;
+        this.gatewayId = gatewayId;
+        this.deliveryTag = deliveryTag;
+    }
+
     public TBase getEvent() {
         return event;
     }
@@ -64,4 +73,12 @@ public class MessageContext {
     public String getGatewayId() {
         return gatewayId;
     }
+
+    public long getDeliveryTag() {
+        return deliveryTag;
+    }
+
+    public void setDeliveryTag(long deliveryTag) {
+        this.deliveryTag = deliveryTag;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
----------------------------------------------------------------------
diff --git 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
index 1c7b0e8..7c88a25 100644
--- 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
+++ 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
@@ -165,7 +165,7 @@ public class RabbitMQTaskLaunchConsumer {
                             event = taskTerminateEvent;
                             gatewayId = null;
                         }
-                        MessageContext messageContext = new 
MessageContext(event, message.getMessageType(), message.getMessageId(), 
gatewayId);
+                        MessageContext messageContext = new 
MessageContext(event, message.getMessageType(), message.getMessageId(), 
gatewayId,deliveryTag);
                         
messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
                         handler.onMessage(messageContext);
                         try {
@@ -241,4 +241,12 @@ public class RabbitMQTaskLaunchConsumer {
             }
         }
     }
+
+    public void sendAck(long deliveryTag){
+        try {
+            channel.basicAck(deliveryTag,false); //todo move this logic to 
monitoring component to ack when the job is done
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
 
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index b200468..f430bc9 100644
--- 
a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ 
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -292,43 +292,45 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface,
         * This method gracefully handler gfac node failures
         */
        synchronized public void process(WatchedEvent watchedEvent) {
+               log.info(watchedEvent.getPath());
                synchronized (mutex) {
                        try {
                                Event.KeeperState state = 
watchedEvent.getState();
                                switch (state) {
-                               case SyncConnected:
-                                       mutex.notify();
-                                       break;
-                case Expired:case Disconnected:
-                        try {
-                            zk = new 
ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this);
-                            synchronized (mutex) {
-                                mutex.wait(); // waiting for the syncConnected 
event
-                            }
-                            String airavataServerHostPort = ServerSettings
-                                    
.getSetting(Constants.ORCHESTRATOR_SERVER_HOST)
-                                    + ":"
-                                    + ServerSettings
-                                    
.getSetting(Constants.ORCHESTRATOR_SERVER_PORT);
-                            String OrchServer = ServerSettings
-                                    
.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE);
-                            
registerOrchestratorService(airavataServerHostPort, OrchServer);
-                        } catch (IOException e) {
-                            e.printStackTrace();
-                        } catch (ApplicationSettingsException e) {
-                            e.printStackTrace();
-                        } catch (InterruptedException e) {
-                            e.printStackTrace();
-                        } catch (KeeperException e) {
-                            e.printStackTrace();
-                        }
-                        break;
-                }
+                                       case SyncConnected:
+                                               mutex.notify();
+                                               break;
+                                       case Expired:
+                                       case Disconnected:
+                                               try {
+                                                       zk = new 
ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this);
+                                                       synchronized (mutex) {
+                                                               mutex.wait(); 
// waiting for the syncConnected event
+                                                       }
+                                                       String 
airavataServerHostPort = ServerSettings
+                                                                       
.getSetting(Constants.ORCHESTRATOR_SERVER_HOST)
+                                                                       + ":"
+                                                                       + 
ServerSettings
+                                                                       
.getSetting(Constants.ORCHESTRATOR_SERVER_PORT);
+                                                       String OrchServer = 
ServerSettings
+                                                                       
.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE);
+                                                       
registerOrchestratorService(airavataServerHostPort, OrchServer);
+                                               } catch (IOException e) {
+                                                       e.printStackTrace();
+                                               } catch 
(ApplicationSettingsException e) {
+                                                       e.printStackTrace();
+                                               } catch (InterruptedException 
e) {
+                                                       e.printStackTrace();
+                                               } catch (KeeperException e) {
+                                                       e.printStackTrace();
+                                               }
+                                               break;
+                               }
                                if (watchedEvent.getPath() != null
                                                && 
watchedEvent.getPath().startsWith(
-                                                               
ServerSettings.getSetting(
-                                                                               
Constants.ZOOKEEPER_GFAC_SERVER_NODE,
-                                                                               
"/gfac-server"))) {
+                                               ServerSettings.getSetting(
+                                                               
Constants.ZOOKEEPER_GFAC_SERVER_NODE,
+                                                               
"/gfac-server"))) {
                                        List<String> children = 
zk.getChildren(ServerSettings
                                                        
.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE,
                                                                        
"/gfac-server"), true);
@@ -340,18 +342,18 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface,
                                                                                
+ File.separator + gfacNodes, this);
                                        }
                                        switch (watchedEvent.getType()) {
-                                       case NodeCreated:
-                                               mutex.notify();
-                                               break;
-                                       case NodeDeleted:
-                                               // here we have to handle gfac 
node shutdown case
-                                               if (children.size() == 0) {
-                                                       log.error("There are 
not gfac instances to route failed jobs");
-                                                       return;
-                                               }
-                                               // we recover one gfac node at 
a time
-                                               final WatchedEvent event = 
watchedEvent;
-                                               final OrchestratorServerHandler 
handler = this;
+                                               case NodeCreated:
+                                                       mutex.notify();
+                                                       break;
+                                               case NodeDeleted:
+                                                       // here we have to 
handle gfac node shutdown case
+                                                       if (children.size() == 
0) {
+                                                               
log.error("There are not gfac instances to route failed jobs");
+                                                               return;
+                                                       }
+                                                       // we recover one gfac 
node at a time
+                                                       final WatchedEvent 
event = watchedEvent;
+                                                       final 
OrchestratorServerHandler handler = this;
                                                /*(new Thread() {  // disabling 
ft implementation with zk
                                                        public void run() {
                                                                int retry = 0;
@@ -372,7 +374,7 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface,
 
                                                        }
                                                }).start();*/
-                                               break;
+                                                       break;
                                        }
 
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
 
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
index fb3bd51..f19b949 100644
--- 
a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
+++ 
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
@@ -95,6 +95,7 @@ public class OrchestratorRecoveryHandler implements Watcher {
     }
 
     synchronized public void process(WatchedEvent watchedEvent) {
+        log.info(watchedEvent.getPath());
         synchronized (mutex) {
             Event.KeeperState state = watchedEvent.getState();
             switch (state) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index b5e25b1..8066113 100644
--- 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -187,11 +187,9 @@ public class GFACPassiveJobSubmitter implements 
JobSubmitter,Watcher {
                 String[] split = gfacNodeData.split(":");
                 if (zk.exists(gfacServer + File.separator + pickedChild, 
false) != null) {
                     // before submitting the job we check again the state of 
the node
-                    if (GFacUtils.createExperimentEntryForRPC(experimentID, 
taskID, zk, experimentNode, pickedChild, null)) {
-                        TaskSubmitEvent taskSubmitEvent = new 
TaskSubmitEvent(experimentID, taskID, null,null);
-                        MessageContext messageContext = new 
MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK,"LAUNCH.TERMINATE-"+ 
UUID.randomUUID().toString(),null);
-                        publisher.publish(messageContext);
-                    }
+                    TaskSubmitEvent taskSubmitEvent = new 
TaskSubmitEvent(experimentID, taskID, null, null);
+                    MessageContext messageContext = new 
MessageContext(taskSubmitEvent, MessageType.TERMINATETASK, "LAUNCH.TERMINATE-" 
+ UUID.randomUUID().toString(), null);
+                    publisher.publish(messageContext);
                 }
             }
         } catch (InterruptedException e) {
@@ -217,6 +215,8 @@ public class GFACPassiveJobSubmitter implements 
JobSubmitter,Watcher {
     }
 
     synchronized public void process(WatchedEvent event) {
+        logger.info(getClass().getName() + event.getPath());
+        logger.info(getClass().getName()+event.getType());
         synchronized (mutex) {
             switch (event.getState()) {
                 case SyncConnected:

Reply via email to