Repository: airavata Updated Branches: refs/heads/queue-gfac-rabbitmq cb6c4ccf2 -> 5310bb4b5
deleting delivery token Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/5310bb4b Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/5310bb4b Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/5310bb4b Branch: refs/heads/queue-gfac-rabbitmq Commit: 5310bb4b570bd0b09208e710f121e88418261c5f Parents: cb6c4cc Author: Lahiru Gunathilake <[email protected]> Authored: Fri Mar 20 16:14:07 2015 -0400 Committer: Lahiru Gunathilake <[email protected]> Committed: Fri Mar 20 16:14:07 2015 -0400 ---------------------------------------------------------------------- .../airavata/gfac/server/GfacServerHandler.java | 3 + .../core/monitor/GfacInternalStatusUpdator.java | 2 + .../airavata/gfac/core/utils/GFacUtils.java | 134 ++++++++----------- 3 files changed, 61 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/5310bb4b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index d45710e..855bfc5 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -59,6 +59,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Future; +import java.util.concurrent.locks.Lock; public class GfacServerHandler implements GfacService.Iface, Watcher { @@ -79,6 +80,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { private static Integer mutex = -1; + private static Lock lock; + private MonitorPublisher publisher; private String gfacServer; http://git-wip-us.apache.org/repos/asf/airavata/blob/5310bb4b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java index eaa3c5f..6c456b0 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java @@ -98,6 +98,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc consumer.sendAck(GFacUtils.getDeliveryTag(statusChangeRequest.getMonitorID().getExperimentID(), monitorID.getTaskID(), zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME))); } + ZKUtil.deleteRecursive(zk,experimentPath+GFacUtils.DELIVERY_TAG_POSTFIX); ZKUtil.deleteRecursive(zk, experimentPath); break; case FAILED: @@ -107,6 +108,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc consumer.sendAck(GFacUtils.getDeliveryTag(statusChangeRequest.getMonitorID().getExperimentID(), monitorID.getTaskID(), zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME))); } + ZKUtil.deleteRecursive(zk,experimentPath+GFacUtils.DELIVERY_TAG_POSTFIX); ZKUtil.deleteRecursive(zk, experimentPath); break; default: http://git-wip-us.apache.org/repos/asf/airavata/blob/5310bb4b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java index 707cf97..a0fc2cb 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java @@ -1157,94 +1157,72 @@ public class GFacUtils { // This method is dangerous because of moving the experiment data public static boolean createExperimentEntryForPassive(String experimentID, - String taskID, ZooKeeper zk, String experimentNode, - String pickedChild, String tokenId,long deliveryTag) throws KeeperException, + String taskID, ZooKeeper zk, String experimentNode, + String pickedChild, String tokenId, long deliveryTag) throws KeeperException, InterruptedException, ApplicationSettingsException { String experimentPath = experimentNode + File.separator + pickedChild; String newExpNode = experimentPath + File.separator + experimentID + "+" + taskID; Stat exists1 = zk.exists(newExpNode, false); String experimentEntry = GFacUtils.findExperimentEntry(experimentID, taskID, zk); - if (exists1 == null && experimentEntry == null) { // this means this is a very new experiment - // are going to create a new node - log.info("This is a new Job, so creating all the experiment docs from the scratch"); - Stat expParent = zk.exists(newExpNode, false); - zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - - if (tokenId != null && expParent != null) { - zk.setData(newExpNode, tokenId.getBytes(), - expParent.getVersion()); - } - String s = zk.create(newExpNode + File.separator + "state", String - .valueOf(GfacExperimentState.LAUNCHED.getValue()) - .getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - String s1 = zk.create(newExpNode + File.separator + "operation", "submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - zk.exists(s1, true);// we want to know when this node get deleted - String s2 = zk.create(newExpNode + DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag), ZooDefs.Ids.OPEN_ACL_UNSAFE, // here we store the value of delivery message - CreateMode.PERSISTENT); - }else if(experimentEntry != null && GFacUtils.isCancelled(experimentID,taskID,zk) ){ - // this happens when a cancel request comes to a differnt gfac node, in this case we do not move gfac experiment - // node to gfac node specific location, because original request execution will fail with errors - log.error("This experiment is already cancelled and its already executing the cancel operation so cannot submit again !"); + if (exists1 != null) { + log.error("This request is wrong because its already running in the same instance"); return false; - } else if(experimentEntry != null && !GFacUtils.isCancelled(experimentID,taskID,zk)){ - if(ServerSettings.isGFacPassiveMode()){ - log.error("ExperimentID: " + experimentID + " taskID: " + taskID - + " was running by some Gfac instance,but it failed"); - log.info("This is an old Job, so copying data from old experiment location"); - zk.create(newExpNode, - zk.getData(experimentEntry, false, exists1), - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } else if (experimentEntry == null) { // this means this is a very new experiment + // are going to create a new node + log.info("This is a new Job, so creating all the experiment docs from the scratch"); + Stat expParent = zk.exists(newExpNode, false); + zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); - List<String> children = zk.getChildren(experimentEntry, - false); - for (String childNode1 : children) { - String level1 = experimentEntry + File.separator - + childNode1; - Stat exists2 = zk.exists(level1, false); // no need to check exists - String newLeve1 = newExpNode + File.separator + childNode1; - log.info("Creating new znode: " + newLeve1); // these has to be info logs - zk.create(newLeve1, zk.getData(level1, false, exists2), - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - for (String childNode2 : zk.getChildren(level1, false)) { - String level2 = level1 + File.separator + childNode2; - Stat exists3 = zk.exists(level2, false); // no need to check exists - String newLeve2 = newLeve1 + File.separator - + childNode2; - log.info("Creating new znode: " + newLeve2); - zk.create(newLeve2, zk.getData(level2, false, exists3), - ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } - } - // After all the files are successfully transfered we delete the - // old experiment,otherwise we do - // not delete a single file - log.info("After a successful copying of experiment data for an old experiment we delete the old data"); - log.info("Deleting experiment data: " + experimentEntry); - ZKUtil.deleteRecursive(zk, experimentEntry); - }else { - log.error("ExperimentID: " + experimentID + " taskID: " + taskID - + " is already running by this Gfac instance"); - List<String> runningGfacNodeNames = AiravataZKUtils - .getAllGfacNodeNames(zk); // here we take old gfac servers - // too - for (String gfacServerNode : runningGfacNodeNames) { - if (!gfacServerNode.equals(pickedChild)) { - experimentEntry = experimentNode + File.separator - + gfacServerNode + File.separator + experimentID - + "+" + taskID; - break; - } - } - if(experimentEntry!=null) { - ZKUtil.deleteRecursive(zk, experimentEntry); - } + if (tokenId != null && expParent != null) { + zk.setData(newExpNode, tokenId.getBytes(), + expParent.getVersion()); } + String s = zk.create(newExpNode + File.separator + "state", String + .valueOf(GfacExperimentState.LAUNCHED.getValue()) + .getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + String s1 = zk.create(newExpNode + File.separator + "operation", "submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + zk.exists(s1, true);// we want to know when this node get deleted + String s2 = zk.create(newExpNode + DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag), ZooDefs.Ids.OPEN_ACL_UNSAFE, // here we store the value of delivery message + CreateMode.PERSISTENT); + } else { + log.error("ExperimentID: " + experimentID + " taskID: " + taskID + + " was running by some Gfac instance,but it failed"); + log.info("This is an old Job, so copying data from old experiment location"); + zk.create(newExpNode, + zk.getData(experimentEntry, false, exists1), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + List<String> children = zk.getChildren(experimentEntry, + false); + for (String childNode1 : children) { + String level1 = experimentEntry + File.separator + + childNode1; + Stat exists2 = zk.exists(level1, false); // no need to check exists + String newLeve1 = newExpNode + File.separator + childNode1; + log.info("Creating new znode: " + newLeve1); // these has to be info logs + zk.create(newLeve1, zk.getData(level1, false, exists2), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + for (String childNode2 : zk.getChildren(level1, false)) { + String level2 = level1 + File.separator + childNode2; + Stat exists3 = zk.exists(level2, false); // no need to check exists + String newLeve2 = newLeve1 + File.separator + + childNode2; + log.info("Creating new znode: " + newLeve2); + zk.create(newLeve2, zk.getData(level2, false, exists3), + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } + } + // After all the files are successfully transfered we delete the + // old experiment,otherwise we do + // not delete a single file + log.info("After a successful copying of experiment data for an old experiment we delete the old data"); + log.info("Deleting experiment data: " + experimentEntry); + ZKUtil.deleteRecursive(zk, experimentEntry); } return true; }
