adding support to proper acking for messages
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/1231c014 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/1231c014 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/1231c014 Branch: refs/heads/queue-gfac-rabbitmq Commit: 1231c014bebd1d23c2bdd340b7d721abe279d45a Parents: ffbb1b9 Author: Lahiru Gunathilake <[email protected]> Authored: Wed Feb 25 00:59:09 2015 -0500 Committer: Lahiru Gunathilake <[email protected]> Committed: Wed Feb 25 00:59:09 2015 -0500 ---------------------------------------------------------------------- .../airavata/api/server/AiravataAPIServer.java | 1 + .../client/samples/CreateLaunchExperiment.java | 23 +++-- .../airavata/gfac/server/GfacServerHandler.java | 48 +++++++++-- .../airavata/gfac/core/cpi/BetterGfacImpl.java | 1 + .../core/monitor/GfacInternalStatusUpdator.java | 3 + .../airavata/gfac/core/utils/GFacUtils.java | 21 +++-- .../handlers/GridPullMonitorHandler.java | 1 + .../messaging/client/RabbitMQListner.java | 4 +- .../airavata/messaging/core/MessageContext.java | 17 ++++ .../core/impl/RabbitMQTaskLaunchConsumer.java | 10 ++- .../server/OrchestratorServerHandler.java | 90 ++++++++++---------- .../util/OrchestratorRecoveryHandler.java | 1 + .../core/impl/GFACPassiveJobSubmitter.java | 10 +-- 13 files changed, 151 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java index 0e6da90..da42ce0 100644 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java @@ -299,6 +299,7 @@ public class AiravataAPIServer implements IServer, Watcher{ @Override synchronized public void process(WatchedEvent watchedEvent) { + logger.info(watchedEvent.getPath()); synchronized (mutex) { Event.KeeperState state = watchedEvent.getState(); logger.info(state.name()); http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index c4c303f..78c2d71 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@ -47,17 +47,17 @@ import java.util.*; public class CreateLaunchExperiment { //FIXME: Read from a config file -// public static final String THRIFT_SERVER_HOST = "localhost"; -// public static final int THRIFT_SERVER_PORT = 8930; - public static final String THRIFT_SERVER_HOST = "gw111.iu.xsede.org"; - public static final int THRIFT_SERVER_PORT = 9930; + public static final String THRIFT_SERVER_HOST = "localhost"; + public static final int THRIFT_SERVER_PORT = 8930; +// public static final String THRIFT_SERVER_HOST = "gw111.iu.xsede.org"; +// public static final int THRIFT_SERVER_PORT = 9930; private final static Logger logger = LoggerFactory.getLogger(CreateLaunchExperiment.class); private static final String DEFAULT_USER = "default.registry.user"; private static final String DEFAULT_GATEWAY = "default.registry.gateway"; private static Airavata.Client airavataClient; - private static String echoAppId = "Echo_a8fc8511-7b8e-431a-ad0f-de5eb1a9c576"; + private static String echoAppId = "Echo_1365a7fd-eae1-4575-b447-99afb4d79c82"; private static String mpiAppId = "HelloMPI_720e159f-198f-4daa-96ca-9f5eafee92c9"; private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762"; private static String amberAppId = "Amber_42124128-628b-484c-829d-aff8b584eb00"; @@ -93,7 +93,7 @@ public class CreateLaunchExperiment { // final String expId = createEchoExperimentForFSD(airavataClient); List<String> experimentIds = new ArrayList<String>(); try { - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 1; i++) { // final String expId = createExperimentForSSHHost(airavata); // final String expId = createEchoExperimentForFSD(airavataClient); // final String expId = createMPIExperimentForFSD(airavataClient); @@ -120,12 +120,11 @@ public class CreateLaunchExperiment { launchExperiment(airavataClient, expId); } - Thread.sleep(10000); - - for(String exId:experimentIds) { - Experiment experiment = airavataClient.getExperiment(exId); - System.out.println(experiment.getExperimentStatus().toString()); - } + Thread.sleep(100); + for (String exId : experimentIds) { + Experiment experiment = airavataClient.getExperiment(exId); + System.out.println(experiment.getExperimentStatus().toString()); + } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 1c0f095..cca793e 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -65,8 +65,6 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { private Registry registry; private AppCatalog appCatalog; - private String registryURL; - private String gatewayName; private String airavataUserName; @@ -144,12 +142,13 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { CreateMode.PERSISTENT); } String instanceId = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME); - String instantNode = gfacServer + File.separator + instanceId; - zkStat = zk.exists(instantNode, true); + String instanceNode = gfacServer + File.separator + instanceId; + zkStat = zk.exists(instanceNode, true); if (zkStat == null) { - zk.create(instantNode, + zk.create(instanceNode, airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); // other component will watch these childeren creation deletion to monitor the status of the node + zk.getChildren(instanceNode, true); } zkStat = zk.exists(gfacExperiments, false); if (zkStat == null) { @@ -168,6 +167,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { } synchronized public void process(WatchedEvent watchedEvent) { + logger.info(watchedEvent.getPath()); + logger.info(watchedEvent.getType().toString()); synchronized (mutex) { Event.KeeperState state = watchedEvent.getState(); logger.info(state.name()); @@ -191,10 +192,39 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { } catch (KeeperException e) { logger.error(e.getMessage(), e); } + } else if (Event.EventType.NodeDeleted.equals(watchedEvent.getType())) { + String path = watchedEvent.getPath(); + String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + if (path.startsWith(experimentNode)) { + // we got a watch when experiment is removed + String deliveryPath = path + GFacUtils.DELIVERY_TAG_POSTFIX; + try { + Stat exists = zk.exists(deliveryPath, false); + byte[] data = zk.getData(path + GFacUtils.DELIVERY_TAG_POSTFIX, false, exists); + long value = ByateArrayToLong(data); + logger.info("ExperimentId+taskId" + path); + logger.info("Sending Ack back to the Queue, because task is over"); + rabbitMQTaskLaunchConsumer.sendAck(value); + ZKUtil.deleteRecursive(zk,deliveryPath); + } catch (KeeperException e) { + logger.error(e.getMessage(), e); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } } } } + private long ByateArrayToLong(byte[] data) { + long value = 0; + for (int i = 0; i < data.length; i++) + { + value += ((long) data[i] & 0xffL) << (8 * i); + } + return value; + } + public String getGFACServiceVersion() throws TException { return gfac_cpi_serviceConstants.GFAC_CPI_VERSION; } @@ -314,12 +344,18 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); try { - GFacUtils.createExperimentEntryForRPC(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId()); + GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag()); + AiravataZKUtils.getExpStatePath(event.getExperimentId(),event.getTaskId()); submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId()); } catch (KeeperException e) { logger.error(nodeName + " was interrupted."); + rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag()); } catch (InterruptedException e) { logger.error(e.getMessage(), e); + rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag()); + } catch (ApplicationSettingsException e) { + logger.error(e.getMessage(), e); + rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag()); } System.out.println(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getType()); http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index bb612a6..00930e5 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -1158,6 +1158,7 @@ public class BetterGfacImpl implements GFac,Watcher { } public void process(WatchedEvent watchedEvent) { + log.info(watchedEvent.getPath()); if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){ // node data is changed, this means node is cancelled. log.info("Experiment is cancelled with this path:"+watchedEvent.getPath()); http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java index 7818da0..26902e7 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java @@ -29,6 +29,7 @@ import org.apache.airavata.common.utils.Constants; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.listener.AbstractActivityListener; import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest; +import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -49,6 +50,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc MonitorID monitorID = statusChangeRequest.getMonitorID(); String experimentPath = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments") + File.separator + ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME) + File.separator + statusChangeRequest.getMonitorID().getExperimentID() + "+" + monitorID.getTaskID(); + String deliveryTagPath = experimentPath + GFacUtils.DELIVERY_TAG_POSTFIX; Stat exists = null; try { if (!zk.getState().isConnected()) { @@ -107,6 +109,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc } public void process(WatchedEvent watchedEvent) { + logger.info(watchedEvent.getPath()); synchronized (mutex) { Event.KeeperState state = watchedEvent.getState(); if (state == Event.KeeperState.SyncConnected) { http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java index 9f104fa..c825ffd 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java @@ -60,12 +60,14 @@ import java.io.*; import java.net.InetAddress; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.nio.ByteBuffer; import java.util.*; //import org.apache.airavata.commons.gfac.type.ActualParameter; public class GFacUtils { private final static Logger log = LoggerFactory.getLogger(GFacUtils.class); + public static final String DELIVERY_TAG_POSTFIX = "-deliveryTag"; private GFacUtils() { } @@ -1156,7 +1158,7 @@ public class GFacUtils { // This method is dangerous because of moving the experiment data public static boolean createExperimentEntryForPassive(String experimentID, String taskID, ZooKeeper zk, String experimentNode, - String pickedChild, String tokenId) throws KeeperException, + String pickedChild, String tokenId,long deliveryTag) throws KeeperException, InterruptedException { String experimentPath = experimentNode + File.separator + pickedChild; String newExpNode = experimentPath + File.separator + experimentID @@ -1165,15 +1167,14 @@ public class GFacUtils { String experimentEntry = GFacUtils.findExperimentEntry(experimentID, taskID, zk); String foundExperimentPath = null; if (exists1 == null && experimentEntry == null) { // this means this is a very new experiment - List<String> runningGfacNodeNames = AiravataZKUtils - .getAllGfacNodeNames(zk); // here we take old gfac servers - // too + List<String> runningGfacNodeNames = AiravataZKUtils.getAllGfacNodeNames(zk); // here we take old gfac servers + for (String gfacServerNode : runningGfacNodeNames) { if (!gfacServerNode.equals(pickedChild)) { foundExperimentPath = experimentNode + File.separator + gfacServerNode + File.separator + experimentID + "+" + taskID; - exists1 = zk.exists(foundExperimentPath, false); + exists1 = zk.exists(foundExperimentPath, true); if (exists1 != null) { // when the experiment is found we // break the loop break; @@ -1183,21 +1184,23 @@ public class GFacUtils { if (exists1 == null) { // OK this is a pretty new experiment so we // are going to create a new node log.info("This is a new Job, so creating all the experiment docs from the scratch"); + Stat expParent = zk.exists(newExpNode, false); zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - Stat expParent = zk.exists(newExpNode, false); if (tokenId != null && expParent != null) { zk.setData(newExpNode, tokenId.getBytes(), expParent.getVersion()); } - zk.create(newExpNode + File.separator + "state", String + String s = zk.create(newExpNode + File.separator + "state", String .valueOf(GfacExperimentState.LAUNCHED.getValue()) .getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create(newExpNode + File.separator + "operation","submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + String s1 = zk.create(newExpNode + File.separator + "operation", "submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + zk.exists(s1, true);// we want to know when this node get deleted + String s2 = zk.create(newExpNode + DELIVERY_TAG_POSTFIX, ByteBuffer.allocate(8).putLong(deliveryTag).array(), ZooDefs.Ids.OPEN_ACL_UNSAFE, // here we store the value of delivery message CreateMode.PERSISTENT); - } else { // ohhh this node exists in some other failed gfac folder, we // have to move it to this gfac experiment list,safely http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java index e64f596..d5f9f90 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java @@ -125,6 +125,7 @@ public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{ public void process(WatchedEvent watchedEvent) { + logger.info(watchedEvent.getPath()); if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){ // node data is changed, this means node is cancelled. logger.info("Experiment is cancelled with this path:"+watchedEvent.getPath()); http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java ---------------------------------------------------------------------- diff --git a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java index 601497a..48edbe8 100644 --- a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java +++ b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java @@ -28,7 +28,7 @@ import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.messaging.core.MessageHandler; import org.apache.airavata.messaging.core.MessagingConstants; -import org.apache.airavata.messaging.core.impl.RabbitMQConsumer; +import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; import org.apache.airavata.model.messaging.event.*; import org.apache.airavata.model.workspace.experiment.ExperimentState; import org.apache.commons.cli.*; @@ -67,7 +67,7 @@ public class RabbitMQListner { String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL); System.out.println("broker url " + brokerUrl); final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME); - RabbitMQConsumer consumer = new RabbitMQConsumer(brokerUrl, exchangeName); + RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName); consumer.listen(new MessageHandler() { @Override public Map<String, Object> getProperties() { http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java index 0a39d92..272f413 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java @@ -32,6 +32,7 @@ public class MessageContext { private final String messageId; private final String gatewayId; private Timestamp updatedTime; + private long deliveryTag; public MessageContext(TBase message, MessageType type, String messageId, String gatewayId) { @@ -41,6 +42,14 @@ public class MessageContext { this.gatewayId = gatewayId; } + public MessageContext(TBase event, MessageType type, String messageId, String gatewayId, long deliveryTag) { + this.event = event; + this.type = type; + this.messageId = messageId; + this.gatewayId = gatewayId; + this.deliveryTag = deliveryTag; + } + public TBase getEvent() { return event; } @@ -64,4 +73,12 @@ public class MessageContext { public String getGatewayId() { return gatewayId; } + + public long getDeliveryTag() { + return deliveryTag; + } + + public void setDeliveryTag(long deliveryTag) { + this.deliveryTag = deliveryTag; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java index 1c7b0e8..7c88a25 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java @@ -165,7 +165,7 @@ public class RabbitMQTaskLaunchConsumer { event = taskTerminateEvent; gatewayId = null; } - MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId); + MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId,deliveryTag); messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime())); handler.onMessage(messageContext); try { @@ -241,4 +241,12 @@ public class RabbitMQTaskLaunchConsumer { } } } + + public void sendAck(long deliveryTag){ + try { + channel.basicAck(deliveryTag,false); //todo move this logic to monitoring component to ack when the job is done + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index b200468..f430bc9 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -292,43 +292,45 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, * This method gracefully handler gfac node failures */ synchronized public void process(WatchedEvent watchedEvent) { + log.info(watchedEvent.getPath()); synchronized (mutex) { try { Event.KeeperState state = watchedEvent.getState(); switch (state) { - case SyncConnected: - mutex.notify(); - break; - case Expired:case Disconnected: - try { - zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this); - synchronized (mutex) { - mutex.wait(); // waiting for the syncConnected event - } - String airavataServerHostPort = ServerSettings - .getSetting(Constants.ORCHESTRATOR_SERVER_HOST) - + ":" - + ServerSettings - .getSetting(Constants.ORCHESTRATOR_SERVER_PORT); - String OrchServer = ServerSettings - .getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE); - registerOrchestratorService(airavataServerHostPort, OrchServer); - } catch (IOException e) { - e.printStackTrace(); - } catch (ApplicationSettingsException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (KeeperException e) { - e.printStackTrace(); - } - break; - } + case SyncConnected: + mutex.notify(); + break; + case Expired: + case Disconnected: + try { + zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this); + synchronized (mutex) { + mutex.wait(); // waiting for the syncConnected event + } + String airavataServerHostPort = ServerSettings + .getSetting(Constants.ORCHESTRATOR_SERVER_HOST) + + ":" + + ServerSettings + .getSetting(Constants.ORCHESTRATOR_SERVER_PORT); + String OrchServer = ServerSettings + .getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE); + registerOrchestratorService(airavataServerHostPort, OrchServer); + } catch (IOException e) { + e.printStackTrace(); + } catch (ApplicationSettingsException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (KeeperException e) { + e.printStackTrace(); + } + break; + } if (watchedEvent.getPath() != null && watchedEvent.getPath().startsWith( - ServerSettings.getSetting( - Constants.ZOOKEEPER_GFAC_SERVER_NODE, - "/gfac-server"))) { + ServerSettings.getSetting( + Constants.ZOOKEEPER_GFAC_SERVER_NODE, + "/gfac-server"))) { List<String> children = zk.getChildren(ServerSettings .getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"), true); @@ -340,18 +342,18 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, + File.separator + gfacNodes, this); } switch (watchedEvent.getType()) { - case NodeCreated: - mutex.notify(); - break; - case NodeDeleted: - // here we have to handle gfac node shutdown case - if (children.size() == 0) { - log.error("There are not gfac instances to route failed jobs"); - return; - } - // we recover one gfac node at a time - final WatchedEvent event = watchedEvent; - final OrchestratorServerHandler handler = this; + case NodeCreated: + mutex.notify(); + break; + case NodeDeleted: + // here we have to handle gfac node shutdown case + if (children.size() == 0) { + log.error("There are not gfac instances to route failed jobs"); + return; + } + // we recover one gfac node at a time + final WatchedEvent event = watchedEvent; + final OrchestratorServerHandler handler = this; /*(new Thread() { // disabling ft implementation with zk public void run() { int retry = 0; @@ -372,7 +374,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, } }).start();*/ - break; + break; } http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java index fb3bd51..f19b949 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java @@ -95,6 +95,7 @@ public class OrchestratorRecoveryHandler implements Watcher { } synchronized public void process(WatchedEvent watchedEvent) { + log.info(watchedEvent.getPath()); synchronized (mutex) { Event.KeeperState state = watchedEvent.getState(); switch (state) { http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java index b5e25b1..8066113 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java @@ -187,11 +187,9 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { String[] split = gfacNodeData.split(":"); if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { // before submitting the job we check again the state of the node - if (GFacUtils.createExperimentEntryForRPC(experimentID, taskID, zk, experimentNode, pickedChild, null)) { - TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, null,null); - MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK,"LAUNCH.TERMINATE-"+ UUID.randomUUID().toString(),null); - publisher.publish(messageContext); - } + TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, null, null); + MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.TERMINATETASK, "LAUNCH.TERMINATE-" + UUID.randomUUID().toString(), null); + publisher.publish(messageContext); } } } catch (InterruptedException e) { @@ -217,6 +215,8 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { } synchronized public void process(WatchedEvent event) { + logger.info(getClass().getName() + event.getPath()); + logger.info(getClass().getName()+event.getType()); synchronized (mutex) { switch (event.getState()) { case SyncConnected:
