Repository: airavata Updated Branches: refs/heads/master 26ef3e43a -> 640e1e3e5
Fixed zookeeper path issue with delivery tag acknowledgement Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/640e1e3e Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/640e1e3e Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/640e1e3e Branch: refs/heads/master Commit: 640e1e3e5f60aa8754afea64289916cd810d01af Parents: 26ef3e4 Author: Shameera Rathanyaka <[email protected]> Authored: Fri Sep 4 11:47:02 2015 -0400 Committer: Shameera Rathanyaka <[email protected]> Committed: Fri Sep 4 11:47:02 2015 -0400 ---------------------------------------------------------------------- .../apache/airavata/gfac/core/GFacUtils.java | 150 ++----------------- .../apache/airavata/gfac/impl/GFacWorker.java | 3 +- .../airavata/gfac/server/GfacServerHandler.java | 5 +- 3 files changed, 16 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/640e1e3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java index d3d4c7e..1870255 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java @@ -541,32 +541,7 @@ public class GFacUtils { } } - /** - * This will return a value if the server is down because we iterate through exisiting experiment nodes, not - * through gfac-server nodes - * - * @param experimentID - * @param curatorClient - * @return - * @throws KeeperException - * @throws InterruptedException - */ - public static String findExperimentEntry(String experimentID, CuratorFramework curatorClient) throws Exception { - String experimentNode = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE; - List<String> children = curatorClient.getChildren().forPath(experimentNode); - for (String pickedChild : children) { - String experimentPath = experimentNode + File.separator + pickedChild; - String newExpNode = experimentPath + File.separator + experimentID; - Stat exists = curatorClient.checkExists().forPath(newExpNode); - if (exists == null) { - continue; - } else { - return newExpNode; - } - } - return null; - } - + // Fixme - remove this method. with new changes we don't need to use this method. public static boolean setExperimentCancelRequest(String processId, CuratorFramework curatorClient, long deliveryTag) throws Exception { String experimentNode = ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, processId); @@ -576,56 +551,6 @@ public class GFacUtils { return true; } - public static boolean isCancelled(String experimentID, CuratorFramework curatorClient) throws Exception { - String experimentEntry = GFacUtils.findExperimentEntry(experimentID, curatorClient); - if (experimentEntry == null) { - return false; - } else { - Stat exists = curatorClient.checkExists().forPath(experimentEntry); - if (exists != null) { - String operation = new String(curatorClient.getData().storingStatIn(exists).forPath(experimentEntry + File.separator + "operation")); - if ("cancel".equals(operation)) { - return true; - } - } - } - return false; - } - -// public static void saveHandlerData(JobExecutionContext jobExecutionContext, -// StringBuffer data, String className) throws GFacHandlerException { -// try { -// CuratorFramework curatorClient = jobExecutionContext.getCuratorClient(); -// if (curatorClient != null) { -// String expZnodeHandlerPath = AiravataZKUtils -// .getExpZnodeHandlerPath( -// jobExecutionContext.getExperimentID(), -// className); -// Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath); -// if (exists != null) { -// curatorClient.setData().withVersion(exists.getVersion()).forPath(expZnodeHandlerPath, data.toString().getBytes()); -// } else { -// log.error("Saving Handler data failed, Stat is null"); -// } -// } -// } catch (Exception e) { -// throw new GFacHandlerException(e); -// } -// } - -// public static String getHandlerData(ProcessContext processContext, String className) throws Exception { -// CuratorFramework curatorClient = processContext.getCuratorClient(); -// if (curatorClient != null) { -// String expZnodeHandlerPath = AiravataZKUtils -// .getExpZnodeHandlerPath( -// processContext.getExperimentID(), -// className); -// Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath); -// return new String(processContext.getCuratorClient().getData().storingStatIn(exists).forPath(expZnodeHandlerPath)); -// } -// return null; -// } - public static CredentialReader getCredentialReader() throws ApplicationSettingsException, IllegalAccessException, InstantiationException { @@ -712,62 +637,6 @@ public class GFacUtils { return buffer.getLong(); } - public static ExperimentState updateExperimentStatus(String experimentId, ExperimentState state) throws RegistryException { - ExperimentCatalog airavataExperimentCatalog = RegistryFactory.getDefaultExpCatalog(); - ExperimentModel details = (ExperimentModel) airavataExperimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId); - if (details == null) { - details = new ExperimentModel(); - details.setExperimentId(experimentId); - } - ExperimentStatus status = new ExperimentStatus(); - status.setState(state); - status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); - if (!ExperimentState.CANCELED.equals(details.getExperimentStatus().getState()) && - !ExperimentState.CANCELING.equals(details.getExperimentStatus().getState())) { - status.setState(state); - } else { - status.setState(details.getExperimentStatus().getState()); - } - details.setExperimentStatus(status); - log.info("Updating the experiment status of experiment: " + experimentId + " to " + status.getState().toString()); - airavataExperimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT_STATUS, status, experimentId); - return details.getExperimentStatus().getState(); - } - -// public static boolean isFailedJob(JobExecutionContext jec) { -//// JobStatus jobStatus = jec.getJobDetails().getJobStatus(); -//// if (jobStatus.getJobState() == JobState.FAILED) { -//// return true; -//// } -// return false; -// } - - public static boolean ackCancelRequest(String experimentId, CuratorFramework curatorClient) throws Exception { - String experimentEntry = GFacUtils.findExperimentEntry(experimentId, curatorClient); - String cancelNodePath = experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX; - if (experimentEntry == null) { - // This should be handle in validation request. Gfac shouldn't get any invalidate experiment. - log.error("Cannot find the experiment Entry, so cancel operation cannot be performed. " + - "This happen when experiment completed and already removed from the CuratorFramework"); - } else { - // check cancel operation is being processed for the same experiment. - Stat cancelState = curatorClient.checkExists().forPath(cancelNodePath); - if (cancelState != null) { - ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), cancelNodePath, true); - return true; - } - } - return false; - } - -// public static void publishTaskStatus (JobExecutionContext jobExecutionContext, LocalEventPublisher publisher, TaskStatus state){ -// TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), -// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), -// jobExecutionContext.getExperimentID(), -// jobExecutionContext.getGatewayID()); -// publisher.publish(new TaskStatusChangeRequestEvent(state, taskIdentity)); -// } - public static String getZKGfacServersParentPath() { return ZKPaths.makePath(ZkConstants.ZOOKEEPER_SERVERS_NODE, ZkConstants.ZOOKEEPER_GFAC_SERVER_NODE); } @@ -1114,14 +983,19 @@ public class GFacUtils { } public static String getExperimentNodePath(String experimentId) { - return ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator + experimentId; + return ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId); } - public static long getProcessDeliveryTag(CuratorFramework curatorClient, String processId) throws Exception { - String deliveryTagPath = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + processId + ZkConstants - .ZOOKEEPER_DELIVERYTAG_NODE; - byte[] bytes = curatorClient.getData().forPath(deliveryTagPath); - return GFacUtils.bytesToLong(bytes); + public static long getProcessDeliveryTag(CuratorFramework curatorClient, String experimentId, String processId) throws Exception { + String deliveryTagPath = ZKPaths.makePath(ZKPaths.makePath(getExperimentNodePath(experimentId), processId), + ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE); + Stat stat = curatorClient.checkExists().forPath(deliveryTagPath); + if (stat != null) { + byte[] bytes = curatorClient.getData().forPath(deliveryTagPath); + return GFacUtils.bytesToLong(bytes); + } else { + throw new GFacException("Couldn't fine the deliveryTag path: " + deliveryTagPath); + } } public static void saveJobModel(ProcessContext processContext, JobModel jobModel) throws GFacException { http://git-wip-us.apache.org/repos/asf/airavata/blob/640e1e3e/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java index 49dc45d..cdbca05 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java @@ -182,7 +182,8 @@ public class GFacWorker implements Runnable { private void sendAck() { try { - long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(), processId); + long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(), + processContext.getExperimentId(), processId); Factory.getProcessLaunchConsumer().sendAck(processDeliveryTag); log.info("expId: {}, procesId: {} :- Sent ack for deliveryTag {}", processContext.getExperimentId(), processId, processDeliveryTag); http://git-wip-us.apache.org/repos/asf/airavata/blob/640e1e3e/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 1040b05..8427cdc 100644 --- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -314,7 +314,7 @@ public class GfacServerHandler implements GfacService.Iface { long deliveryTag = messageContext.getDeliveryTag(); // create /experiments//{experimentId}{processId} node and set data - serverName, add redelivery listener - String experimentNodePath = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + experimentId; + String experimentNodePath = GFacUtils.getExperimentNodePath(experimentId); String zkProcessNodePath = ZKPaths.makePath(experimentNodePath, processId); ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkProcessNodePath); curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, gfacServerName.getBytes()); @@ -347,8 +347,7 @@ public class GfacServerHandler implements GfacService.Iface { String experimentId = event.getExperimentId(); String processId = event.getProcessId(); long deliveryTag = messageContext.getDeliveryTag(); - String processNodePath = ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, - experimentId), processId); + String processNodePath = ZKPaths.makePath(GFacUtils.getExperimentNodePath(experimentId), processId); Stat stat = curatorClient.checkExists().forPath(processNodePath); if (stat != null) { // create /experiments/{processId}/deliveryTag node and set data - deliveryTag
