Repository: airavata
Updated Branches:
  refs/heads/master 26ef3e43a -> 640e1e3e5


Fixed zookeeper path issue with delivery tag acknowledgement


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/640e1e3e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/640e1e3e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/640e1e3e

Branch: refs/heads/master
Commit: 640e1e3e5f60aa8754afea64289916cd810d01af
Parents: 26ef3e4
Author: Shameera Rathanyaka <[email protected]>
Authored: Fri Sep 4 11:47:02 2015 -0400
Committer: Shameera Rathanyaka <[email protected]>
Committed: Fri Sep 4 11:47:02 2015 -0400

----------------------------------------------------------------------
 .../apache/airavata/gfac/core/GFacUtils.java    | 150 ++-----------------
 .../apache/airavata/gfac/impl/GFacWorker.java   |   3 +-
 .../airavata/gfac/server/GfacServerHandler.java |   5 +-
 3 files changed, 16 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/640e1e3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index d3d4c7e..1870255 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -541,32 +541,7 @@ public class GFacUtils {
         }
     }
 
-    /**
-     * This will return a value if the server is down because we iterate 
through exisiting experiment nodes, not
-     * through gfac-server nodes
-     *
-     * @param experimentID
-     * @param curatorClient
-     * @return
-     * @throws KeeperException
-     * @throws InterruptedException
-     */
-    public static String findExperimentEntry(String experimentID, 
CuratorFramework curatorClient) throws Exception {
-        String experimentNode = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE;
-        List<String> children = 
curatorClient.getChildren().forPath(experimentNode);
-        for (String pickedChild : children) {
-            String experimentPath = experimentNode + File.separator + 
pickedChild;
-            String newExpNode = experimentPath + File.separator + experimentID;
-            Stat exists = curatorClient.checkExists().forPath(newExpNode);
-            if (exists == null) {
-                continue;
-            } else {
-                return newExpNode;
-            }
-        }
-        return null;
-    }
-
+       // Fixme - remove this method. with new changes we don't need to use 
this method.
     public static boolean setExperimentCancelRequest(String processId, 
CuratorFramework curatorClient, long
                    deliveryTag) throws Exception {
            String experimentNode = 
ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
@@ -576,56 +551,6 @@ public class GFacUtils {
            return true;
     }
 
-    public static boolean isCancelled(String experimentID, CuratorFramework 
curatorClient) throws Exception {
-        String experimentEntry = GFacUtils.findExperimentEntry(experimentID, 
curatorClient);
-        if (experimentEntry == null) {
-            return false;
-        } else {
-            Stat exists = curatorClient.checkExists().forPath(experimentEntry);
-            if (exists != null) {
-                String operation = new 
String(curatorClient.getData().storingStatIn(exists).forPath(experimentEntry + 
File.separator + "operation"));
-                if ("cancel".equals(operation)) {
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
-
-//    public static void saveHandlerData(JobExecutionContext 
jobExecutionContext,
-//                                       StringBuffer data, String className) 
throws GFacHandlerException {
-//             try {
-//                     CuratorFramework curatorClient = 
jobExecutionContext.getCuratorClient();
-//                     if (curatorClient != null) {
-//                             String expZnodeHandlerPath = AiravataZKUtils
-//                                             .getExpZnodeHandlerPath(
-//                                                             
jobExecutionContext.getExperimentID(),
-//                                                             className);
-//                             Stat exists = 
curatorClient.checkExists().forPath(expZnodeHandlerPath);
-//                if (exists != null) {
-//                                     
curatorClient.setData().withVersion(exists.getVersion()).forPath(expZnodeHandlerPath,
 data.toString().getBytes());
-//                             } else {
-//                    log.error("Saving Handler data failed, Stat is null");
-//                }
-//            }
-//             } catch (Exception e) {
-//                     throw new GFacHandlerException(e);
-//             }
-//     }
-
-//    public static String getHandlerData(ProcessContext processContext, 
String className) throws Exception {
-//        CuratorFramework curatorClient = processContext.getCuratorClient();
-//        if (curatorClient != null) {
-//            String expZnodeHandlerPath = AiravataZKUtils
-//                    .getExpZnodeHandlerPath(
-//                            processContext.getExperimentID(),
-//                            className);
-//            Stat exists = 
curatorClient.checkExists().forPath(expZnodeHandlerPath);
-//            return new 
String(processContext.getCuratorClient().getData().storingStatIn(exists).forPath(expZnodeHandlerPath));
-//        }
-//        return null;
-//    }
-
     public static CredentialReader getCredentialReader()
             throws ApplicationSettingsException, IllegalAccessException,
             InstantiationException {
@@ -712,62 +637,6 @@ public class GFacUtils {
         return buffer.getLong();
     }
 
-    public static ExperimentState updateExperimentStatus(String experimentId, 
ExperimentState state) throws RegistryException {
-        ExperimentCatalog airavataExperimentCatalog = 
RegistryFactory.getDefaultExpCatalog();
-        ExperimentModel details = (ExperimentModel) 
airavataExperimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, 
experimentId);
-        if (details == null) {
-            details = new ExperimentModel();
-            details.setExperimentId(experimentId);
-        }
-        ExperimentStatus status = new ExperimentStatus();
-        status.setState(state);
-        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
-        if 
(!ExperimentState.CANCELED.equals(details.getExperimentStatus().getState()) &&
-                
!ExperimentState.CANCELING.equals(details.getExperimentStatus().getState())) {
-            status.setState(state);
-        } else {
-            status.setState(details.getExperimentStatus().getState());
-        }
-        details.setExperimentStatus(status);
-        log.info("Updating the experiment status of experiment: " + 
experimentId + " to " + status.getState().toString());
-        
airavataExperimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT_STATUS, 
status, experimentId);
-        return details.getExperimentStatus().getState();
-    }
-
-//    public static boolean isFailedJob(JobExecutionContext jec) {
-////        JobStatus jobStatus = jec.getJobDetails().getJobStatus();
-////        if (jobStatus.getJobState() == JobState.FAILED) {
-////            return true;
-////        }
-//        return false;
-//    }
-
-    public static boolean ackCancelRequest(String experimentId, 
CuratorFramework curatorClient) throws Exception {
-        String experimentEntry = GFacUtils.findExperimentEntry(experimentId, 
curatorClient);
-        String cancelNodePath = experimentEntry + 
AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX;
-        if (experimentEntry == null) {
-            // This should be handle in validation request. Gfac shouldn't get 
any invalidate experiment.
-            log.error("Cannot find the experiment Entry, so cancel operation 
cannot be performed. " +
-                    "This happen when experiment completed and already removed 
from the CuratorFramework");
-        } else {
-            // check cancel operation is being processed for the same 
experiment.
-            Stat cancelState = 
curatorClient.checkExists().forPath(cancelNodePath);
-            if (cancelState != null) {
-                
ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), 
cancelNodePath, true);
-                return true;
-            }
-        }
-        return false;
-    }
-
-//    public static void publishTaskStatus (JobExecutionContext 
jobExecutionContext, LocalEventPublisher publisher, TaskStatus state){
-//        TaskIdentifier taskIdentity = new 
TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
-//                
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-//                jobExecutionContext.getExperimentID(),
-//                jobExecutionContext.getGatewayID());
-//        publisher.publish(new TaskStatusChangeRequestEvent(state, 
taskIdentity));
-//    }
-
     public static String getZKGfacServersParentPath() {
         return ZKPaths.makePath(ZkConstants.ZOOKEEPER_SERVERS_NODE, 
ZkConstants.ZOOKEEPER_GFAC_SERVER_NODE);
     }
@@ -1114,14 +983,19 @@ public class GFacUtils {
     }
 
        public static String getExperimentNodePath(String experimentId) {
-               return ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator + 
experimentId;
+               return ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, 
experimentId);
        }
 
-       public static long getProcessDeliveryTag(CuratorFramework 
curatorClient, String processId) throws Exception {
-               String deliveryTagPath = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE 
+ "/" + processId + ZkConstants
-                               .ZOOKEEPER_DELIVERYTAG_NODE;
-               byte[] bytes = curatorClient.getData().forPath(deliveryTagPath);
-               return GFacUtils.bytesToLong(bytes);
+       public static long getProcessDeliveryTag(CuratorFramework 
curatorClient, String experimentId, String processId) throws Exception {
+               String deliveryTagPath = 
ZKPaths.makePath(ZKPaths.makePath(getExperimentNodePath(experimentId), 
processId),
+                               ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE);
+               Stat stat = 
curatorClient.checkExists().forPath(deliveryTagPath);
+               if (stat != null) {
+                       byte[] bytes = 
curatorClient.getData().forPath(deliveryTagPath);
+                       return GFacUtils.bytesToLong(bytes);
+               } else {
+                       throw new GFacException("Couldn't fine the deliveryTag 
path: " + deliveryTagPath);
+               }
        }
 
        public static void saveJobModel(ProcessContext processContext, JobModel 
jobModel) throws GFacException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/640e1e3e/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 49dc45d..cdbca05 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -182,7 +182,8 @@ public class GFacWorker implements Runnable {
 
        private void sendAck() {
                try {
-                       long processDeliveryTag = 
GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(), processId);
+                       long processDeliveryTag = 
GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(),
+                                       processContext.getExperimentId(), 
processId);
                        
Factory.getProcessLaunchConsumer().sendAck(processDeliveryTag);
                        log.info("expId: {}, procesId: {} :- Sent ack for 
deliveryTag {}", processContext.getExperimentId(),
                                        processId, processDeliveryTag);

http://git-wip-us.apache.org/repos/asf/airavata/blob/640e1e3e/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
 
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 1040b05..8427cdc 100644
--- 
a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ 
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -314,7 +314,7 @@ public class GfacServerHandler implements GfacService.Iface 
{
                long deliveryTag = messageContext.getDeliveryTag();
 
                // create /experiments//{experimentId}{processId} node and set 
data - serverName, add redelivery listener
-               String experimentNodePath = 
ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + experimentId;
+               String experimentNodePath = 
GFacUtils.getExperimentNodePath(experimentId);
                String zkProcessNodePath = ZKPaths.makePath(experimentNodePath, 
processId);
                
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
zkProcessNodePath);
                
curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, 
gfacServerName.getBytes());
@@ -347,8 +347,7 @@ public class GfacServerHandler implements GfacService.Iface 
{
                String experimentId = event.getExperimentId();
                String processId = event.getProcessId();
                long deliveryTag = messageContext.getDeliveryTag();
-               String processNodePath = 
ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE,
-                               experimentId), processId);
+               String processNodePath = 
ZKPaths.makePath(GFacUtils.getExperimentNodePath(experimentId), processId);
                Stat stat = 
curatorClient.checkExists().forPath(processNodePath);
                if (stat != null) {
                        // create /experiments/{processId}/deliveryTag node and 
set data - deliveryTag

Reply via email to