Repository: airavata Updated Branches: refs/heads/master 749ad9330 -> ac3be7ae4
Handled TerminatTask and send ack to rabbitMQ Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/ac3be7ae Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/ac3be7ae Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/ac3be7ae Branch: refs/heads/master Commit: ac3be7ae4f057876641fcf90725566b02bb1e312 Parents: 749ad93 Author: Shameera Rathanyaka <[email protected]> Authored: Wed Jun 17 17:22:32 2015 -0400 Committer: Shameera Rathanyaka <[email protected]> Committed: Wed Jun 17 17:22:32 2015 -0400 ---------------------------------------------------------------------- .../airavata/gfac/core/GFacConstants.java | 2 + .../apache/airavata/gfac/core/GFacUtils.java | 59 ++++++---- .../airavata/gfac/server/GfacServerHandler.java | 108 ++++++------------- 3 files changed, 72 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/ac3be7ae/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java index 2706f27..b662fff 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java @@ -54,7 +54,9 @@ public class GFacConstants { public static final String ZOOKEEPER_GFAC_SERVER_NODE = "/gfac"; public static final String ZOOKEEPER_EXPERIMENT_NODE = "/experiments"; public static final String ZOOKEEPER_DELIVERYTAG_NODE = "/deliveryTag"; + public static final String ZOOKEEPER_TOKEN_NODE = "/token"; public static final String ZOOKEEPER_CANCEL_LISTENER_NODE = "/cancelListener"; + public static final String ZOOKEEPER_CANCEL_REQEUST = "CANCEL_REQUEST"; public static final String PROP_WORKFLOW_INSTANCE_ID = "workflow.instance.id"; public static final String PROP_WORKFLOW_NODE_ID = "workflow.node.id"; http://git-wip-us.apache.org/repos/asf/airavata/blob/ac3be7ae/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java index 63cc093..e630570 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java @@ -28,6 +28,8 @@ import org.apache.airavata.credential.store.store.CredentialReader; import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.context.ProcessContext; +import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher; +import org.apache.airavata.gfac.core.watcher.RedeliveryRequestWatcher; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationParallelismType; import org.apache.airavata.model.appcatalog.computeresource.*; @@ -497,26 +499,13 @@ public class GFacUtils { return null; } - public static boolean setExperimentCancel(String experimentId, CuratorFramework curatorClient, long deliveryTag) throws Exception { - String experimentEntry = GFacUtils.findExperimentEntry(experimentId, curatorClient); - if (experimentEntry == null) { - // This should be handle in validation request. Gfac shouldn't get any invalidate experiment. - log.error("Cannot find the experiment Entry, so cancel operation cannot be performed. " + - "This happen when experiment completed and already removed from the zookeeper"); - return false; - } else { - // check cancel operation is being processed for the same experiment. - Stat cancelState = curatorClient.checkExists().forPath(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX); - if (cancelState != null) { - // another cancel operation is being processed. only one cancel operation can exist for a given experiment. - return false; - } - - curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE) - .forPath(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag)); // save cancel delivery tag to be acknowledge at the end. - return true; - } - + public static boolean setExperimentCancelRequest(String experimentId, CuratorFramework curatorClient, long + deliveryTag) throws Exception { + String experimentNode = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId); + String cancelListenerNodePath = ZKPaths.makePath(experimentNode, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); + curatorClient.setData().withVersion(-1).forPath(cancelListenerNodePath, GFacConstants.ZOOKEEPER_CANCEL_REQEUST + .getBytes()); + return true; } public static boolean isCancelled(String experimentID, CuratorFramework curatorClient) throws Exception { @@ -712,7 +701,7 @@ public class GFacUtils { // } public static String getZKGfacServersParentPath() { - return GFacConstants.ZOOKEEPER_SERVERS_NODE + GFacConstants.ZOOKEEPER_GFAC_SERVER_NODE; + return ZKPaths.makePath(GFacConstants.ZOOKEEPER_SERVERS_NODE, GFacConstants.ZOOKEEPER_GFAC_SERVER_NODE); } public static JobDescriptor createJobDescriptor(ProcessContext processContext) throws GFacException, AppCatalogException, ApplicationSettingsException { @@ -1033,4 +1022,32 @@ public class GFacUtils { throw new GFacException("Error occurred while creating the temp job script file", e); } } + + public static String getExperimentNodePath(String experimentId) { + return GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator + experimentId; + } + + public static void createExperimentNode(CuratorFramework curatorClient, String gfacServerName, String + experimentId, long deliveryTag, String token) throws Exception { + // create /experiments/experimentId node and set data - serverName, add redelivery listener + String experimentPath = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentPath); + curatorClient.setData().withVersion(-1).forPath(experimentPath, gfacServerName.getBytes()); + curatorClient.getData().usingWatcher(new RedeliveryRequestWatcher()).forPath(experimentPath); + + // create /experiments/experimentId/deliveryTag node and set data - deliveryTag + String deliveryTagPath = ZKPaths.makePath(experimentPath, GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), deliveryTagPath); + curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag)); + + // create /experiments/experimentId/token node and set data - token + String tokenNodePath = ZKPaths.makePath(experimentId, GFacConstants.ZOOKEEPER_TOKEN_NODE); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), tokenNodePath); + curatorClient.setData().withVersion(-1).forPath(tokenNodePath, token.getBytes()); + + // create /experiments/experimentId/cancelListener node and set watcher for data changes + String cancelListenerNode = ZKPaths.makePath(experimentPath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), cancelListenerNode); + curatorClient.getData().usingWatcher(new CancelRequestWatcher()).forPath(cancelListenerNode); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/ac3be7ae/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index e8037a4..a4a4874 100644 --- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -28,13 +28,11 @@ import org.apache.airavata.common.utils.LocalEventPublisher; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.common.utils.listener.AbstractActivityListener; -import org.apache.airavata.gfac.core.GFac; import org.apache.airavata.gfac.core.GFacConstants; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.GFacUtils; import org.apache.airavata.gfac.cpi.GfacService; import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants; -import org.apache.airavata.gfac.impl.BetterGfacImpl; import org.apache.airavata.gfac.impl.Factory; import org.apache.airavata.gfac.impl.GFacWorker; import org.apache.airavata.messaging.core.MessageContext; @@ -119,15 +117,14 @@ public class GfacServerHandler implements GfacService.Iface { ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacConstants.ZOOKEEPER_EXPERIMENT_NODE); // create EPHEMERAL server name node String gfacName = ServerSettings.getGFacServerName(); - if (curatorClient.checkExists().forPath(GFacUtils.getZKGfacServersParentPath() + (gfacName.startsWith("/") ? - gfacName : "/" + gfacName)) == null) { - curatorClient.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) - .forPath(GFacUtils.getZKGfacServersParentPath() + (gfacName.startsWith("/") ? gfacName : "/" + - gfacName)); + if (curatorClient.checkExists().forPath(ZKPaths.makePath(GFacUtils.getZKGfacServersParentPath() ,gfacName)) == null) { + curatorClient.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) + .forPath(ZKPaths.makePath(GFacUtils.getZKGfacServersParentPath(), gfacName)); } - curatorClient.setData().withVersion(-1).forPath(GFacUtils.getZKGfacServersParentPath() + - (gfacName.startsWith("/") ? gfacName : "/" + gfacName), new String(airavataServerHostPort).getBytes()); + curatorClient.setData().withVersion(-1).forPath(ZKPaths.makePath(GFacUtils.getZKGfacServersParentPath(), + gfacName), airavataServerHostPort.getBytes()); + } public String getGFACServiceVersion() throws TException { @@ -159,16 +156,18 @@ public class GfacServerHandler implements GfacService.Iface { processId); try { - executorService.execute(new GFacWorker(experimentId,processId, gatewayId, tokenId)); + executorService.execute(new GFacWorker(experimentId, processId, gatewayId, tokenId)); } catch (GFacException e) { log.error("Failed to submit process", e); return false; + } catch (Exception e) { + log.error("Error creating zookeeper nodes"); } return true; } public boolean cancelJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException { - log.info(experimentId, "GFac Received cancel job request for Experiment: {} TaskId: {} ", experimentId, taskId); + /* log.info(experimentId, "GFac Received cancel job request for Experiment: {} TaskId: {} ", experimentId, taskId); try { if (BetterGfacImpl.getInstance().cancel(experimentId, taskId, gatewayId, tokenId)) { log.debug(experimentId, "Successfully cancelled job, experiment {} , task {}", experimentId, taskId); @@ -180,22 +179,17 @@ public class GfacServerHandler implements GfacService.Iface { } catch (Exception e) { log.error(experimentId, "Error cancelling the experiment {}.", experimentId); throw new TException("Error cancelling the experiment : " + e.getMessage(), e); - } + }*/ + return false; } - public ExperimentCatalog getExperimentCatalog() { - return experimentCatalog; - } - public void setExperimentCatalog(ExperimentCatalog experimentCatalog) { - this.experimentCatalog = experimentCatalog; - } public static void startStatusUpdators(ExperimentCatalog experimentCatalog, CuratorFramework curatorClient, LocalEventPublisher publisher, RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) { - try { + /* try { String[] listenerClassList = ServerSettings.getActivityListeners(); Publisher rabbitMQPublisher = PublisherFactory.createActivityPublisher(); for (String listenerClass : listenerClassList) { @@ -208,42 +202,16 @@ public class GfacServerHandler implements GfacService.Iface { } } catch (Exception e) { log.error("Error loading the listener classes configured in airavata-server.properties", e); - } - } - private static class TestHandler implements MessageHandler{ - @Override - public Map<String, Object> getProperties() { - Map<String, Object> props = new HashMap<String, Object>(); - ArrayList<String> keys = new ArrayList<String>(); - keys.add(ServerSettings.getLaunchQueueName()); - keys.add(ServerSettings.getCancelQueueName()); - props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys); - props.put(MessagingConstants.RABBIT_QUEUE, ServerSettings.getLaunchQueueName()); - return props; - } - - @Override - public void onMessage(MessageContext message) { - TaskSubmitEvent event = new TaskSubmitEvent(); - TBase messageEvent = message.getEvent(); - byte[] bytes = new byte[0]; - try { - bytes = ThriftUtils.serializeThriftObject(messageEvent); - ThriftUtils.createThriftFromBytes(bytes, event); - System.out.println(event.getExperimentId()); - } catch (TException e) { - log.error(e.getMessage(), e); - } - } + }*/ } private class TaskLaunchMessageHandler implements MessageHandler { private String experimentNode; - private String nodeName; + private String gfacServerName; public TaskLaunchMessageHandler() throws ApplicationSettingsException { experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE; - nodeName = ServerSettings.getGFacServerName(); + gfacServerName = ServerSettings.getGFacServerName(); } public Map<String, Object> getProperties() { @@ -271,9 +239,8 @@ public class GfacServerHandler implements GfacService.Iface { status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT_STATUS, status, event.getExperimentId()); try { - GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), curatorClient, - experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag()); - AiravataZKUtils.getExpStatePath(event.getExperimentId()); + GFacUtils.createExperimentNode(curatorClient, gfacServerName, event.getExperimentId(), message.getDeliveryTag(), + event.getTokenId()); submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId()); } catch (Exception e) { log.error(e.getMessage(), e); @@ -285,38 +252,27 @@ public class GfacServerHandler implements GfacService.Iface { log.error("Error while updating experiment status", e); } } else if (message.getType().equals(MessageType.TERMINATETASK)) { - boolean cancelSuccess = false; TaskTerminateEvent event = new TaskTerminateEvent(); TBase messageEvent = message.getEvent(); try { byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); ThriftUtils.createThriftFromBytes(bytes, event); - boolean saveDeliveryTagSuccess = GFacUtils.setExperimentCancel(event.getExperimentId(), curatorClient, message.getDeliveryTag()); - if (saveDeliveryTagSuccess) { - cancelSuccess = cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId()); - System.out.println(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getType()); - } else { - throw new GFacException("Terminate Task fail to save delivery tag : " + String.valueOf(message.getDeliveryTag()) + " \n" + - "This happens when another cancel operation is being processed or experiment is in one of final states, complete|failed|cancelled."); - } + boolean success = GFacUtils.setExperimentCancelRequest(event.getExperimentId(), curatorClient, + message.getDeliveryTag()); + if (success) { + log.info("expId:{} - Experiment cancel request save successfully", event.getExperimentId()); + } } catch (Exception e) { - log.error(e.getMessage(), e); + log.error("expId:" + event.getExperimentId() + " - Experiment cancel reqeust failed", e); }finally { - if (cancelSuccess) { - // if cancel success , AiravataExperimentStatusUpdator will send an ack to this message. - } else { - try { - if (GFacUtils.ackCancelRequest(event.getExperimentId(), curatorClient)) { - if (!rabbitMQTaskLaunchConsumer.isOpen()) { - rabbitMQTaskLaunchConsumer.reconnect(); - } - rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag()); - } - } catch (Exception e) { - log.error("Error while ack to cancel request, experimentId: " + event.getExperimentId()); - } - } + try { + if (!rabbitMQTaskLaunchConsumer.isOpen()) { + rabbitMQTaskLaunchConsumer.reconnect(); + } + rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag()); + } catch (AiravataException e) { + log.error("expId: " + event.getExperimentId() + " - Failed to send acknowledgement back to cancel request.", e); + } } } }
