implementing queue submission without curator
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/840e627b Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/840e627b Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/840e627b Branch: refs/heads/master Commit: 840e627b4e24baef8dbf62df8da1042380cb8af1 Parents: 60788ef Author: Lahiru Gunathilake <[email protected]> Authored: Tue Feb 17 23:58:04 2015 -0500 Committer: Lahiru Gunathilake <[email protected]> Committed: Tue Feb 17 23:58:04 2015 -0500 ---------------------------------------------------------------------- .../client/samples/CreateLaunchExperiment.java | 17 ++- .../airavata/common/utils/AiravataZKUtils.java | 22 +++ .../airavata/gfac/server/GfacServerHandler.java | 133 +++++-------------- .../messaging/core/impl/RabbitMQProducer.java | 18 ++- .../core/impl/RabbitMQTaskLaunchConsumer.java | 22 +-- .../core/impl/RabbitMQTaskLaunchPublisher.java | 5 +- 6 files changed, 102 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/840e627b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index 6937c25..1e9d983 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@ -94,14 +94,16 @@ public class CreateLaunchExperiment { public static void createAndLaunchExp() throws TException { // final String expId = createEchoExperimentForFSD(airavataClient); + List<String> experimentIds = new ArrayList<String>(); try { - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 100; i++) { // final String expId = createExperimentForSSHHost(airavata); // final String expId = createEchoExperimentForFSD(airavataClient); // final String expId = createMPIExperimentForFSD(airavataClient); // final String expId = createEchoExperimentForStampede(airavataClient); // final String expId = createEchoExperimentForTrestles(airavataClient); final String expId = createExperimentEchoForLocalHost(airavataClient); + experimentIds.add(expId); // final String expId = createExperimentWRFTrestles(airavataClient); // final String expId = createExperimentForBR2(airavataClient); // final String expId = createExperimentForBR2Amber(airavataClient); @@ -115,11 +117,20 @@ public class CreateLaunchExperiment { // final String expId = createExperimentTRINITYStampede(airavataClient); // final String expId = createExperimentAUTODOCKStampede(airavataClient); // this is not working , we need to register AutoDock app on stampede // final String expId = "Ultrascan_ln_eb029947-391a-4ccf-8ace-9bafebe07cc0"; - System.out.println("Experiment ID : " + expId); + System.out.println("Experiment ID : " + expId); // updateExperiment(airavata, expId); - + launchExperiment(airavataClient, expId); } + + Thread.sleep(10000); + + for(String exId:experimentIds) { + Experiment experiment = airavataClient.getExperiment(exId); + System.out.println(experiment.getExperimentStatus().toString()); + } + + } catch (Exception e) { logger.error("Error while connecting with server", e.getMessage()); e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/airavata/blob/840e627b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java ---------------------------------------------------------------------- diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java index f91fc3c..46f06f1 100644 --- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.net.URL; +import java.nio.ByteBuffer; import java.util.List; public class AiravataZKUtils { @@ -172,4 +173,25 @@ public class AiravataZKUtils { logger.info("Skipping Zookeeper embedded startup ..."); } } + + public static void storeDeliveryTag(ZooKeeper zk,String newExpNode,Double deliveryTag) throws KeeperException, InterruptedException { + String s = zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + Stat expParent = zk.exists(newExpNode, false); + if (expParent != null) { + zk.setData(newExpNode, toByteArray(deliveryTag), + expParent.getVersion()); + } + } + + public static byte[] toByteArray(double value) { + byte[] bytes = new byte[8]; + ByteBuffer.wrap(bytes).putDouble(value); + return bytes; + } + + public static double toDouble(byte[] bytes) { + return ByteBuffer.wrap(bytes).getDouble(); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/840e627b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 679a5ee..1c0f095 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -45,30 +45,23 @@ import org.apache.airavata.model.messaging.event.TaskTerminateEvent; import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; import org.apache.airavata.registry.cpi.Registry; import org.apache.airavata.registry.cpi.RegistryException; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.recipes.leader.LeaderSelector; -import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; -import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; -import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -public class GfacServerHandler implements GfacService.Iface, Watcher{ +public class GfacServerHandler implements GfacService.Iface, Watcher { private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GfacServerHandler.class); + private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer; + private Registry registry; private AppCatalog appCatalog; @@ -80,8 +73,6 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ private ZooKeeper zk; - private boolean connected = false; - private static Integer mutex = -1; private MonitorPublisher publisher; @@ -94,16 +85,10 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ private List<Future> inHandlerFutures; - private String nodeName = null; - - private CuratorFramework curatorFramework = null; private BlockingQueue<TaskSubmitEvent> taskSubmitEvents; - private BlockingQueue<TaskTerminateEvent> taskTerminateEvents; - - private CuratorClient curatorClient; - public GfacServerHandler() throws Exception{ + public GfacServerHandler() throws Exception { // registering with zk try { String zkhostPort = AiravataZKUtils.getZKhostPort(); @@ -112,7 +97,6 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ zk = new ZooKeeper(zkhostPort, 6000, this); // no watcher is required, this will only use to store some data gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); - nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME); synchronized (mutex) { mutex.wait(); // waiting for the syncConnected event } @@ -128,17 +112,11 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ inHandlerFutures = new ArrayList<Future>(); if (ServerSettings.isGFacPassiveMode()) { - taskSubmitEvents = new LinkedBlockingDeque<TaskSubmitEvent>(); - taskTerminateEvents = new LinkedBlockingDeque<TaskTerminateEvent>(); - curatorFramework = CuratorFrameworkFactory.newClient(AiravataZKUtils.getZKhostPort(), new ExponentialBackoffRetry(1000, 3)); - curatorClient = new CuratorClient(curatorFramework, nodeName); + rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer(); + rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler()); - curatorFramework.start(); - curatorClient.start(); } - - - } catch (ApplicationSettingsException e) { + } catch (ApplicationSettingsException e) { logger.error("Error initialising GFAC", e); throw new Exception("Error initialising GFAC", e); } catch (InterruptedException e) { @@ -184,7 +162,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ zk.create(gfacExperiments + File.separator + instanceId, airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - }else{ + } else { logger.error(" Zookeeper is inconsistent state !!!!!"); } } @@ -195,9 +173,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ logger.info(state.name()); if (state == Event.KeeperState.SyncConnected) { mutex.notify(); - connected = true; - } else if(state == Event.KeeperState.Expired || - state == Event.KeeperState.Disconnected){ + } else if (state == Event.KeeperState.Expired || + state == Event.KeeperState.Disconnected) { try { mutex = -1; zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this); @@ -292,24 +269,29 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ public void setAiravataUserName(String airavataUserName) { this.airavataUserName = airavataUserName; } + protected void setGatewayProperties() throws ApplicationSettingsException { - setAiravataUserName(ServerSettings.getDefaultUser()); - setGatewayName(ServerSettings.getDefaultUserGateway()); - } + setAiravataUserName(ServerSettings.getDefaultUser()); + setGatewayName(ServerSettings.getDefaultUserGateway()); + } - private GFac getGfac()throws TException{ + private GFac getGfac() throws TException { try { - return new BetterGfacImpl(registry, appCatalog, zk,publisher); + return new BetterGfacImpl(registry, appCatalog, zk, publisher); } catch (Exception e) { - throw new TException("Error initializing gfac instance",e); + throw new TException("Error initializing gfac instance", e); } } private class TaskLaunchMessageHandler implements MessageHandler { public static final String LAUNCH_TASK = "launch.task"; public static final String TERMINATE_TASK = "teminate.task"; - public TaskLaunchMessageHandler(){ + private String experimentNode; + private String nodeName; + public TaskLaunchMessageHandler() { + experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME,"gfac-node0"); } public Map<String, Object> getProperties() { @@ -318,6 +300,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ keys.add(LAUNCH_TASK); keys.add(TERMINATE_TASK); props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys); + props.put(MessagingConstants.RABBIT_QUEUE, LAUNCH_TASK); return props; } @@ -328,9 +311,16 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ TBase messageEvent = message.getEvent(); byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); ThriftUtils.createThriftFromBytes(bytes, event); - taskSubmitEvents.add(event); - - + experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + + try { + GFacUtils.createExperimentEntryForRPC(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId()); + submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId()); + } catch (KeeperException e) { + logger.error(nodeName + " was interrupted."); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } System.out.println(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getType()); } catch (TException e) { @@ -351,61 +341,4 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ } } } - - public class CuratorClient extends LeaderSelectorListenerAdapter implements Closeable { - private final String name; - private final LeaderSelector leaderSelector; - private final AtomicInteger leaderCount = new AtomicInteger(); - private final String path; - private String experimentNode; - - public CuratorClient(CuratorFramework client, String name) { - this.name = name; - // create a leader selector using the given path for management - // all participants in a given leader selection must use the same path - // ExampleClient here is also a LeaderSelectorListener but this isn't required - experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); - path = experimentNode + File.separator + "leader"; - leaderSelector = new LeaderSelector(client, path, this); - // for most cases you will want your instance to requeue when it relinquishes leadership - leaderSelector.autoRequeue(); - } - - public void start() throws IOException { - // the selection for this instance doesn't start until the leader selector is started - // leader selection is done in the background so this call to leaderSelector.start() returns immediately - leaderSelector.start(); - } - - @Override - public void close() throws IOException { - leaderSelector.close(); - } - - @Override - public void takeLeadership(CuratorFramework client) throws Exception { - // we are now the leader. This method should not return until we want to relinquish leadership - final int waitSeconds = (int) (5 * Math.random()) + 1; - - logger.info(name + " is now the leader. Waiting " + waitSeconds + " seconds..."); - logger.info(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before."); - RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer(); - String listenId = rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler()); - - TaskSubmitEvent event = taskSubmitEvents.take(); - try { - GFacUtils.createExperimentEntryForRPC(event.getExperimentId(),event.getTaskId(),client.getZookeeperClient().getZooKeeper(),experimentNode,name,event.getTokenId()); - submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId()); - Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds)); - } catch (InterruptedException e) { - logger.error(name + " was interrupted."); - Thread.currentThread().interrupt(); - } finally { - Thread.sleep(5); - logger.info(name + " relinquishing leadership.: "+ new Date().toString()); - rabbitMQTaskLaunchConsumer.stopListen(listenId); - } - } - } - } http://git-wip-us.apache.org/repos/asf/airavata/blob/840e627b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java index 570b17f..fffeece 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java @@ -113,8 +113,10 @@ public class RabbitMQProducer { log.info("setting basic.qos / prefetch count to " + prefetchCount + " for " + exchangeName); channel.basicQos(prefetchCount); } - channel.exchangeDeclare(exchangeName, getExchangeType, false); - } catch (Exception e) { + if(exchangeName!=null) { + channel.exchangeDeclare(exchangeName, getExchangeType, false); + } + } catch (Exception e) { reset(); String msg = "could not open channel for exchange " + exchangeName; log.error(msg); @@ -132,6 +134,18 @@ public class RabbitMQProducer { } } + public void sendToWorkerQueue(byte []message, String routingKey) throws Exception { + try { + channel.basicPublish( "", routingKey, + MessageProperties.PERSISTENT_TEXT_PLAIN, + message); + } catch (IOException e) { + String msg = "Failed to publish message to exchange: " + exchangeName; + log.error(msg, e); + throw new Exception(msg, e); + } + } + private Connection createConnection() throws IOException { try { ConnectionFactory connectionFactory = new ConnectionFactory(); http://git-wip-us.apache.org/repos/asf/airavata/blob/840e627b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java index 4bc7468..1c7b0e8 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java @@ -24,6 +24,7 @@ import com.rabbitmq.client.*; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.AiravataZKUtils; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.messaging.core.MessageContext; @@ -82,7 +83,7 @@ public class RabbitMQTaskLaunchConsumer { log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName); channel = connection.createChannel(); - channel.exchangeDeclare(taskLaunchExchangeName, "fanout"); +// channel.exchangeDeclare(taskLaunchExchangeName, "fanout"); } catch (Exception e) { String msg = "could not open channel for exchange " + taskLaunchExchangeName; @@ -98,7 +99,6 @@ public class RabbitMQTaskLaunchConsumer { if (routing == null) { throw new IllegalArgumentException("The routing key must be present"); } - List<String> keys = new ArrayList<String>(); if (routing instanceof List) { for (Object o : (List)routing) { @@ -113,7 +113,7 @@ public class RabbitMQTaskLaunchConsumer { if (queueName == null) { if (!channel.isOpen()) { channel = connection.createChannel(); - channel.exchangeDeclare(taskLaunchExchangeName, "fanout"); +// channel.exchangeDeclare(taskLaunchExchangeName, "fanout"); } queueName = channel.queueDeclare().getQueue(); } else { @@ -131,11 +131,11 @@ public class RabbitMQTaskLaunchConsumer { } // bind all the routing keys - for (String routingKey : keys) { - channel.queueBind(queueName, taskLaunchExchangeName, routingKey); - } - - channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) { +// for (String routingKey : keys) { +// channel.queueBind(queueName, taskLaunchExchangeName, routingKey); +// } + // autoAck=false, we will ack after task is done + channel.basicConsume(queueName, false, consumerTag, new QueueingConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, @@ -147,6 +147,7 @@ public class RabbitMQTaskLaunchConsumer { ThriftUtils.createThriftFromBytes(body, message); TBase event = null; String gatewayId = null; + long deliveryTag = envelope.getDeliveryTag(); //todo store this in zookeeper, once job is done we can ack if(message.getMessageType().equals(MessageType.LAUNCHTASK)) { TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(); ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent); @@ -167,6 +168,11 @@ public class RabbitMQTaskLaunchConsumer { MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId); messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime())); handler.onMessage(messageContext); + try { + channel.basicAck(deliveryTag,false); //todo move this logic to monitoring component to ack when the job is done + } catch (IOException e) { + logger.error(e.getMessage(), e); + } } catch (TException e) { String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id; log.warn(msg, e); http://git-wip-us.apache.org/repos/asf/airavata/blob/840e627b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java index 23b2379..0f95fbf 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java @@ -20,6 +20,7 @@ */ package org.apache.airavata.messaging.core.impl; +import com.rabbitmq.client.MessageProperties; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; @@ -50,7 +51,7 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{ log.error(message, e); throw new AiravataException(message, e); } - rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName,"fanout"); + rabbitMQProducer = new RabbitMQProducer(brokerUrl, null,null); rabbitMQProducer.open(); } @@ -70,7 +71,7 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{ routingKey = TERMINATE_TASK; } byte[] messageBody = ThriftUtils.serializeThriftObject(message); - rabbitMQProducer.send(messageBody, routingKey); + rabbitMQProducer.sendToWorkerQueue(messageBody, routingKey); } catch (TException e) { String msg = "Error while deserializing the object"; log.error(msg, e);
