updates
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/c5497cdf Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/c5497cdf Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/c5497cdf Branch: refs/heads/docker-integration Commit: c5497cdf9cab23c19d5cc2280b81b079a9aa2268 Parents: bc78b9d Author: gayan <[email protected]> Authored: Mon Sep 22 17:26:08 2014 +0530 Committer: Nirmal Fernando <[email protected]> Committed: Tue Sep 23 18:23:25 2014 +0530 ---------------------------------------------------------------------- .../publisher/CartridgeAgentEventPublisher.java | 352 +++++++++---------- .../InstanceStatusEventMessageListener.java | 64 ++-- 2 files changed, 210 insertions(+), 206 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/c5497cdf/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java index e5e6a4e..46c250b 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java @@ -37,180 +37,180 @@ import org.apache.stratos.messaging.util.Util; * Cartridge agent event publisher. */ public class CartridgeAgentEventPublisher { - private static final Log log = LogFactory.getLog(CartridgeAgentEventPublisher.class); - private static boolean started; - private static boolean activated; - private static boolean readyToShutdown; - private static boolean maintenance; - - public static void publishInstanceStartedEvent() { - if (!isStarted()) { - if (log.isInfoEnabled()) { - log.info("Publishing instance started event"); - } - InstanceStartedEvent event = - new InstanceStartedEvent( - CartridgeAgentConfiguration.getInstance() - .getServiceName(), - CartridgeAgentConfiguration.getInstance() - .getClusterId(), - CartridgeAgentConfiguration.getInstance() - .getNetworkPartitionId(), - CartridgeAgentConfiguration.getInstance() - .getPartitionId(), - CartridgeAgentConfiguration.getInstance() - .getMemberId()); - - EventPublisher eventPublisher = - EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC); - eventPublisher.publish(event); - setStarted(true); - if (log.isInfoEnabled()) { - log.info("Instance started event published"); - } - - } else { - if (log.isWarnEnabled()) { - log.warn("Instance already started"); - } - } - } - - public static void publishInstanceActivatedEvent() { - if (!isActivated()) { - if (log.isInfoEnabled()) { - log.info("Publishing instance activated event"); - } - InstanceActivatedEvent event = - new InstanceActivatedEvent( - CartridgeAgentConfiguration.getInstance() - .getServiceName(), - CartridgeAgentConfiguration.getInstance() - .getClusterId(), - CartridgeAgentConfiguration.getInstance() - .getNetworkPartitionId(), - CartridgeAgentConfiguration.getInstance() - .getPartitionId(), - CartridgeAgentConfiguration.getInstance() - .getMemberId()); - - // Event publisher connection will - EventPublisher eventPublisher = - EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC); - eventPublisher.publish(event); - if (log.isInfoEnabled()) { - log.info("Instance activated event published"); - } - - if (log.isInfoEnabled()) { - log.info("Starting health statistics notifier"); - } - Thread thread = new Thread(new HealthStatisticsNotifier()); - thread.start(); - setActivated(true); - if (log.isInfoEnabled()) { - log.info("Health statistics notifier started"); - } - } else { - if (log.isWarnEnabled()) { - log.warn("Instance already activated"); - } - } - } - - public static void publishInstanceReadyToShutdownEvent() { - if (!isReadyToShutdown()) { - if (log.isInfoEnabled()) { - log.info("Publishing instance activated event"); - } - InstanceReadyToShutdownEvent event = - new InstanceReadyToShutdownEvent( - CartridgeAgentConfiguration.getInstance() - .getServiceName(), - CartridgeAgentConfiguration.getInstance() - .getClusterId(), - CartridgeAgentConfiguration.getInstance() - .getNetworkPartitionId(), - CartridgeAgentConfiguration.getInstance() - .getPartitionId(), - CartridgeAgentConfiguration.getInstance() - .getMemberId()); - - EventPublisher eventPublisher = - EventPublisherPool.getPublisher(Util.getMessageTopicName(event)); - eventPublisher.publish(event); - setReadyToShutdown(true); - if (log.isInfoEnabled()) { - log.info("Instance ReadyToShutDown event published"); - } - } else { - if (log.isWarnEnabled()) { - log.warn("Instance already sent ReadyToShutDown event...."); - } - } - } - - public static void publishMaintenanceModeEvent() { - if (!isMaintenance()) { - if (log.isInfoEnabled()) { - log.info("Publishing instance maintenance mode event"); - } - InstanceMaintenanceModeEvent event = - new InstanceMaintenanceModeEvent( - CartridgeAgentConfiguration.getInstance() - .getServiceName(), - CartridgeAgentConfiguration.getInstance() - .getClusterId(), - CartridgeAgentConfiguration.getInstance() - .getNetworkPartitionId(), - CartridgeAgentConfiguration.getInstance() - .getPartitionId(), - CartridgeAgentConfiguration.getInstance() - .getMemberId()); - - EventPublisher eventPublisher = - EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC); - eventPublisher.publish(event); - setMaintenance(true); - if (log.isInfoEnabled()) { - log.info("Instance Maintenance mode event published"); - } - } else { - if (log.isWarnEnabled()) { - log.warn("Instance already in a Maintenance mode...."); - } - } - } - - public static boolean isStarted() { - return started; - } - - public static void setStarted(boolean started) { - CartridgeAgentEventPublisher.started = started; - } - - public static boolean isActivated() { - return activated; - } - - public static void setActivated(boolean activated) { - CartridgeAgentEventPublisher.activated = activated; - } - - public static boolean isReadyToShutdown() { - return readyToShutdown; - } - - public static void setReadyToShutdown(boolean readyToShutdown) { - CartridgeAgentEventPublisher.readyToShutdown = readyToShutdown; - } - - public static boolean isMaintenance() { - return maintenance; - } - - public static void setMaintenance(boolean maintenance) { - CartridgeAgentEventPublisher.maintenance = maintenance; - } + private static final Log log = LogFactory.getLog(CartridgeAgentEventPublisher.class); + private static boolean started; + private static boolean activated; + private static boolean readyToShutdown; + private static boolean maintenance; + + public static void publishInstanceStartedEvent() { + if (!isStarted()) { + if (log.isInfoEnabled()) { + log.info("Publishing instance started event"); + } + InstanceStartedEvent event = + new InstanceStartedEvent( + CartridgeAgentConfiguration.getInstance() + .getServiceName(), + CartridgeAgentConfiguration.getInstance() + .getClusterId(), + CartridgeAgentConfiguration.getInstance() + .getNetworkPartitionId(), + CartridgeAgentConfiguration.getInstance() + .getPartitionId(), + CartridgeAgentConfiguration.getInstance() + .getMemberId()); + + EventPublisher eventPublisher = + EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC); + eventPublisher.publish(event); + setStarted(true); + if (log.isInfoEnabled()) { + log.info("Instance started event published"); + } + + } else { + if (log.isWarnEnabled()) { + log.warn("Instance already started"); + } + } + } + + public static void publishInstanceActivatedEvent() { + if (!isActivated()) { + if (log.isInfoEnabled()) { + log.info("Publishing instance activated event"); + } + InstanceActivatedEvent event = + new InstanceActivatedEvent( + CartridgeAgentConfiguration.getInstance() + .getServiceName(), + CartridgeAgentConfiguration.getInstance() + .getClusterId(), + CartridgeAgentConfiguration.getInstance() + .getNetworkPartitionId(), + CartridgeAgentConfiguration.getInstance() + .getPartitionId(), + CartridgeAgentConfiguration.getInstance() + .getMemberId()); + + // Event publisher connection will + EventPublisher eventPublisher = + EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC); + eventPublisher.publish(event); + if (log.isInfoEnabled()) { + log.info("Instance activated event published"); + } + + if (log.isInfoEnabled()) { + log.info("Starting health statistics notifier"); + } + Thread thread = new Thread(new HealthStatisticsNotifier()); + thread.start(); + setActivated(true); + if (log.isInfoEnabled()) { + log.info("Health statistics notifier started"); + } + } else { + if (log.isWarnEnabled()) { + log.warn("Instance already activated"); + } + } + } + + public static void publishInstanceReadyToShutdownEvent() { + if (!isReadyToShutdown()) { + if (log.isInfoEnabled()) { + log.info("Publishing instance activated event"); + } + InstanceReadyToShutdownEvent event = + new InstanceReadyToShutdownEvent( + CartridgeAgentConfiguration.getInstance() + .getServiceName(), + CartridgeAgentConfiguration.getInstance() + .getClusterId(), + CartridgeAgentConfiguration.getInstance() + .getNetworkPartitionId(), + CartridgeAgentConfiguration.getInstance() + .getPartitionId(), + CartridgeAgentConfiguration.getInstance() + .getMemberId()); + + EventPublisher eventPublisher = + EventPublisherPool.getPublisher(Util.getMessageTopicName(event)); + eventPublisher.publish(event); + setReadyToShutdown(true); + if (log.isInfoEnabled()) { + log.info("Instance ReadyToShutDown event published"); + } + } else { + if (log.isWarnEnabled()) { + log.warn("Instance already sent ReadyToShutDown event...."); + } + } + } + + public static void publishMaintenanceModeEvent() { + if (!isMaintenance()) { + if (log.isInfoEnabled()) { + log.info("Publishing instance maintenance mode event"); + } + InstanceMaintenanceModeEvent event = + new InstanceMaintenanceModeEvent( + CartridgeAgentConfiguration.getInstance() + .getServiceName(), + CartridgeAgentConfiguration.getInstance() + .getClusterId(), + CartridgeAgentConfiguration.getInstance() + .getNetworkPartitionId(), + CartridgeAgentConfiguration.getInstance() + .getPartitionId(), + CartridgeAgentConfiguration.getInstance() + .getMemberId()); + + EventPublisher eventPublisher = + EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC); + eventPublisher.publish(event); + setMaintenance(true); + if (log.isInfoEnabled()) { + log.info("Instance Maintenance mode event published"); + } + } else { + if (log.isWarnEnabled()) { + log.warn("Instance already in a Maintenance mode...."); + } + } + } + + public static boolean isStarted() { + return started; + } + + public static void setStarted(boolean started) { + CartridgeAgentEventPublisher.started = started; + } + + public static boolean isActivated() { + return activated; + } + + public static void setActivated(boolean activated) { + CartridgeAgentEventPublisher.activated = activated; + } + + public static boolean isReadyToShutdown() { + return readyToShutdown; + } + + public static void setReadyToShutdown(boolean readyToShutdown) { + CartridgeAgentEventPublisher.readyToShutdown = readyToShutdown; + } + + public static boolean isMaintenance() { + return maintenance; + } + + public static void setMaintenance(boolean maintenance) { + CartridgeAgentEventPublisher.maintenance = maintenance; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/c5497cdf/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java index 9d2833e..5f415b5 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java @@ -33,43 +33,47 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; * this is to handle the topology subscription */ public class InstanceStatusEventMessageListener implements MqttCallback { - private static final Log log = LogFactory.getLog(InstanceStatusEventMessageListener.class); + public static final String ORG_APACHE_STRATOS_MESSAGING_EVENT = "org.apache.stratos.messaging.event."; + private static final Log log = LogFactory.getLog(InstanceStatusEventMessageListener.class); - @Override - public void connectionLost(Throwable arg0) { - // TODO Auto-generated method stub + @Override + public void connectionLost(Throwable arg0) { + if (log.isDebugEnabled()) { + log.debug("Connection lost"); + } - } + } - @Override - public void deliveryComplete(IMqttDeliveryToken arg0) { - // TODO Auto-generated method stub + @Override + public void deliveryComplete(IMqttDeliveryToken arg0) { + if (log.isDebugEnabled()) { + log.debug("Delivery completed"); + } + } - } + @Override + public void messageArrived(String arg0, MqttMessage message) throws Exception { + if (message instanceof MqttMessage) { - @Override - public void messageArrived(String arg0, MqttMessage message) throws Exception { - if (message instanceof MqttMessage) { + TextMessage receivedMessage = new ActiveMQTextMessage(); - TextMessage receivedMessage = new ActiveMQTextMessage(); - System.out.println("instance notifier messege received...."); - receivedMessage.setText(new String(message.getPayload())); - receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME, - "org.apache.stratos.messaging.event.".concat(arg0.replace("/", - "."))); + receivedMessage.setText(new String(message.getPayload())); + receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME, + ORG_APACHE_STRATOS_MESSAGING_EVENT.concat(arg0.replace("/", + "."))); - try { - if (log.isDebugEnabled()) { - log.debug(String.format("Instance notifier message received: %s", - ((TextMessage) message).getText())); - } - // Add received message to the queue - InstanceStatusEventMessageQueue.getInstance().add(receivedMessage); + try { + if (log.isDebugEnabled()) { + log.debug(String.format("Instance notifier message received: %s", + ((TextMessage) message).getText())); + } + // Add received message to the queue + InstanceStatusEventMessageQueue.getInstance().add(receivedMessage); - } catch (JMSException e) { - log.error(e.getMessage(), e); - } - } + } catch (JMSException e) { + log.error(e.getMessage(), e); + } + } - } + } }
