Repository: airavata
Updated Branches:
  refs/heads/queue-gfac-rabbitmq cb6c4ccf2 -> 5310bb4b5


deleting delivery token


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/5310bb4b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/5310bb4b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/5310bb4b

Branch: refs/heads/queue-gfac-rabbitmq
Commit: 5310bb4b570bd0b09208e710f121e88418261c5f
Parents: cb6c4cc
Author: Lahiru Gunathilake <[email protected]>
Authored: Fri Mar 20 16:14:07 2015 -0400
Committer: Lahiru Gunathilake <[email protected]>
Committed: Fri Mar 20 16:14:07 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/server/GfacServerHandler.java |   3 +
 .../core/monitor/GfacInternalStatusUpdator.java |   2 +
 .../airavata/gfac/core/utils/GFacUtils.java     | 134 ++++++++-----------
 3 files changed, 61 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/5310bb4b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
 
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index d45710e..855bfc5 100644
--- 
a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ 
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -59,6 +59,7 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
 
 
 public class GfacServerHandler implements GfacService.Iface, Watcher {
@@ -79,6 +80,8 @@ public class GfacServerHandler implements GfacService.Iface, 
Watcher {
 
     private static Integer mutex = -1;
 
+    private static Lock lock;
+
     private MonitorPublisher publisher;
 
     private String gfacServer;

http://git-wip-us.apache.org/repos/asf/airavata/blob/5310bb4b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index eaa3c5f..6c456b0 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -98,6 +98,7 @@ public class GfacInternalStatusUpdator implements 
AbstractActivityListener, Watc
                     
consumer.sendAck(GFacUtils.getDeliveryTag(statusChangeRequest.getMonitorID().getExperimentID(),
                             monitorID.getTaskID(), zk, experimentNode, 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)));
                 }
+                
ZKUtil.deleteRecursive(zk,experimentPath+GFacUtils.DELIVERY_TAG_POSTFIX);
                 ZKUtil.deleteRecursive(zk, experimentPath);
                 break;
             case FAILED:
@@ -107,6 +108,7 @@ public class GfacInternalStatusUpdator implements 
AbstractActivityListener, Watc
                     
consumer.sendAck(GFacUtils.getDeliveryTag(statusChangeRequest.getMonitorID().getExperimentID(),
                             monitorID.getTaskID(), zk, experimentNode, 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)));
                 }
+                
ZKUtil.deleteRecursive(zk,experimentPath+GFacUtils.DELIVERY_TAG_POSTFIX);
                 ZKUtil.deleteRecursive(zk, experimentPath);
                 break;
             default:

http://git-wip-us.apache.org/repos/asf/airavata/blob/5310bb4b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 707cf97..a0fc2cb 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -1157,94 +1157,72 @@ public class GFacUtils {
 
        // This method is dangerous because of moving the experiment data
        public static boolean createExperimentEntryForPassive(String 
experimentID,
-                                                                               
                          String taskID, ZooKeeper zk, String experimentNode,
-                                                                               
                          String pickedChild, String tokenId,long deliveryTag) 
throws KeeperException,
+                                                                               
                                  String taskID, ZooKeeper zk, String 
experimentNode,
+                                                                               
                                  String pickedChild, String tokenId, long 
deliveryTag) throws KeeperException,
                        InterruptedException, ApplicationSettingsException {
                String experimentPath = experimentNode + File.separator + 
pickedChild;
                String newExpNode = experimentPath + File.separator + 
experimentID
                                + "+" + taskID;
                Stat exists1 = zk.exists(newExpNode, false);
                String experimentEntry = 
GFacUtils.findExperimentEntry(experimentID, taskID, zk);
-               if (exists1 == null && experimentEntry == null) {  // this 
means this is a very new experiment
-                               // are going to create a new node
-                               log.info("This is a new Job, so creating all 
the experiment docs from the scratch");
-                               Stat expParent = zk.exists(newExpNode, false);
-                               zk.create(newExpNode, new byte[0], 
ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                                               CreateMode.PERSISTENT);
-
-                               if (tokenId != null && expParent != null) {
-                                       zk.setData(newExpNode, 
tokenId.getBytes(),
-                                                       expParent.getVersion());
-                               }
-                               String s = zk.create(newExpNode + 
File.separator + "state", String
-                                                               
.valueOf(GfacExperimentState.LAUNCHED.getValue())
-                                                               .getBytes(), 
ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                                               CreateMode.PERSISTENT);
-                               String s1 = zk.create(newExpNode + 
File.separator + "operation", "submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                                               CreateMode.PERSISTENT);
-                               zk.exists(s1, true);// we want to know when 
this node get deleted
-                               String s2 = zk.create(newExpNode + 
DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag), ZooDefs.Ids.OPEN_ACL_UNSAFE,  
// here we store the value of delivery message
-                                               CreateMode.PERSISTENT);
-               }else if(experimentEntry != null && 
GFacUtils.isCancelled(experimentID,taskID,zk) ){
-                       // this happens when a cancel request comes to a 
differnt gfac node, in this case we do not move gfac experiment
-                       // node to gfac node specific location, because 
original request execution will fail with errors
-                       log.error("This experiment is already cancelled and its 
already executing the cancel operation so cannot submit again !");
+               if (exists1 != null) {
+                       log.error("This request is wrong because its already 
running in the same instance");
                        return false;
-               } else if(experimentEntry != null && 
!GFacUtils.isCancelled(experimentID,taskID,zk)){
-                       if(ServerSettings.isGFacPassiveMode()){
-                               log.error("ExperimentID: " + experimentID + " 
taskID: " + taskID
-                                               + " was running by some Gfac 
instance,but it failed");
-                               log.info("This is an old Job, so copying data 
from old experiment location");
-                               zk.create(newExpNode,
-                                               zk.getData(experimentEntry, 
false, exists1),
-                                               ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+               } else if (experimentEntry == null) {  // this means this is a 
very new experiment
+                       // are going to create a new node
+                       log.info("This is a new Job, so creating all the 
experiment docs from the scratch");
+                       Stat expParent = zk.exists(newExpNode, false);
+                       zk.create(newExpNode, new byte[0], 
ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                                       CreateMode.PERSISTENT);
 
-                               List<String> children = 
zk.getChildren(experimentEntry,
-                                               false);
-                               for (String childNode1 : children) {
-                                       String level1 = experimentEntry + 
File.separator
-                                                       + childNode1;
-                                       Stat exists2 = zk.exists(level1, 
false); // no need to check exists
-                                       String newLeve1 = newExpNode + 
File.separator + childNode1;
-                                       log.info("Creating new znode: " + 
newLeve1); // these has to be info logs
-                                       zk.create(newLeve1, zk.getData(level1, 
false, exists2),
-                                                       
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-                                       for (String childNode2 : 
zk.getChildren(level1, false)) {
-                                               String level2 = level1 + 
File.separator + childNode2;
-                                               Stat exists3 = 
zk.exists(level2, false); // no need to check exists
-                                               String newLeve2 = newLeve1 + 
File.separator
-                                                               + childNode2;
-                                               log.info("Creating new znode: " 
+ newLeve2);
-                                               zk.create(newLeve2, 
zk.getData(level2, false, exists3),
-                                                               
ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                                                               
CreateMode.PERSISTENT);
-                                       }
-                               }
-                               // After all the files are successfully 
transfered we delete the
-                               // old experiment,otherwise we do
-                               // not delete a single file
-                               log.info("After a successful copying of 
experiment data for an old experiment we delete the old data");
-                               log.info("Deleting experiment data: " + 
experimentEntry);
-                               ZKUtil.deleteRecursive(zk, experimentEntry);
-                       }else {
-                               log.error("ExperimentID: " + experimentID + " 
taskID: " + taskID
-                                               + " is already running by this 
Gfac instance");
-                               List<String> runningGfacNodeNames = 
AiravataZKUtils
-                                               .getAllGfacNodeNames(zk); // 
here we take old gfac servers
-                               // too
-                               for (String gfacServerNode : 
runningGfacNodeNames) {
-                                       if 
(!gfacServerNode.equals(pickedChild)) {
-                                               experimentEntry = 
experimentNode + File.separator
-                                                               + 
gfacServerNode + File.separator + experimentID
-                                                               + "+" + taskID;
-                                               break;
-                                       }
-                               }
-                               if(experimentEntry!=null) {
-                                       ZKUtil.deleteRecursive(zk, 
experimentEntry);
-                               }
+                       if (tokenId != null && expParent != null) {
+                               zk.setData(newExpNode, tokenId.getBytes(),
+                                               expParent.getVersion());
                        }
+                       String s = zk.create(newExpNode + File.separator + 
"state", String
+                                                       
.valueOf(GfacExperimentState.LAUNCHED.getValue())
+                                                       .getBytes(), 
ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                                       CreateMode.PERSISTENT);
+                       String s1 = zk.create(newExpNode + File.separator + 
"operation", "submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                                       CreateMode.PERSISTENT);
+                       zk.exists(s1, true);// we want to know when this node 
get deleted
+                       String s2 = zk.create(newExpNode + 
DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag), ZooDefs.Ids.OPEN_ACL_UNSAFE,  
// here we store the value of delivery message
+                                       CreateMode.PERSISTENT);
+               } else {
+                       log.error("ExperimentID: " + experimentID + " taskID: " 
+ taskID
+                                       + " was running by some Gfac 
instance,but it failed");
+                       log.info("This is an old Job, so copying data from old 
experiment location");
+                       zk.create(newExpNode,
+                                       zk.getData(experimentEntry, false, 
exists1),
+                                       ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
 
+                       List<String> children = zk.getChildren(experimentEntry,
+                                       false);
+                       for (String childNode1 : children) {
+                               String level1 = experimentEntry + File.separator
+                                               + childNode1;
+                               Stat exists2 = zk.exists(level1, false); // no 
need to check exists
+                               String newLeve1 = newExpNode + File.separator + 
childNode1;
+                               log.info("Creating new znode: " + newLeve1); // 
these has to be info logs
+                               zk.create(newLeve1, zk.getData(level1, false, 
exists2),
+                                               ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+                               for (String childNode2 : zk.getChildren(level1, 
false)) {
+                                       String level2 = level1 + File.separator 
+ childNode2;
+                                       Stat exists3 = zk.exists(level2, 
false); // no need to check exists
+                                       String newLeve2 = newLeve1 + 
File.separator
+                                                       + childNode2;
+                                       log.info("Creating new znode: " + 
newLeve2);
+                                       zk.create(newLeve2, zk.getData(level2, 
false, exists3),
+                                                       
ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                                                       CreateMode.PERSISTENT);
+                               }
+                       }
+                       // After all the files are successfully transfered we 
delete the
+                       // old experiment,otherwise we do
+                       // not delete a single file
+                       log.info("After a successful copying of experiment data 
for an old experiment we delete the old data");
+                       log.info("Deleting experiment data: " + 
experimentEntry);
+                       ZKUtil.deleteRecursive(zk, experimentEntry);
                }
                return true;
        }

Reply via email to