update tanent event publishers with MQTT support
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/d789aa66 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/d789aa66 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/d789aa66 Branch: refs/heads/master Commit: d789aa66ca55b9fda0662865fce818c390a4f043 Parents: 4598778 Author: gayan <[email protected]> Authored: Wed Sep 24 12:19:18 2014 +0530 Committer: Nirmal Fernando <[email protected]> Committed: Wed Sep 24 16:00:10 2014 +0530 ---------------------------------------------------------------------- .../publisher/CartridgeAgentEventPublisher.java | 15 ++++++++------- .../manager/CartridgeSubscriptionManager.java | 9 ++++++--- .../utils/CartridgeSubscriptionUtils.java | 7 +++++-- .../java/org/apache/stratos/messaging/util/Util.java | 3 ++- 4 files changed, 21 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/d789aa66/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java index c57ec2b..9eda926 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java @@ -55,9 +55,9 @@ public class CartridgeAgentEventPublisher { CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(), CartridgeAgentConfiguration.getInstance().getPartitionId(), CartridgeAgentConfiguration.getInstance().getMemberId()); - + String topic = Util.getMessageTopicName(event); EventPublisher eventPublisher = EventPublisherPool - .getPublisher(Constants.INSTANCE_STATUS_TOPIC); + .getPublisher(topic); eventPublisher.publish(event); setStarted(true); if (log.isInfoEnabled()) { @@ -84,8 +84,9 @@ public class CartridgeAgentEventPublisher { CartridgeAgentConfiguration.getInstance().getMemberId()); // Event publisher connection will + String topic = Util.getMessageTopicName(event); EventPublisher eventPublisher = EventPublisherPool - .getPublisher(Constants.INSTANCE_STATUS_TOPIC); + .getPublisher(topic); eventPublisher.publish(event); if (log.isInfoEnabled()) { log.info("Instance activated event published"); @@ -118,9 +119,9 @@ public class CartridgeAgentEventPublisher { CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(), CartridgeAgentConfiguration.getInstance().getPartitionId(), CartridgeAgentConfiguration.getInstance().getMemberId()); - + String topic = Util.getMessageTopicName(event); EventPublisher eventPublisher = EventPublisherPool - .getPublisher(Util.getMessageTopicName(event)); + .getPublisher(topic); eventPublisher.publish(event); setReadyToShutdown(true); if (log.isInfoEnabled()) { @@ -144,9 +145,9 @@ public class CartridgeAgentEventPublisher { CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(), CartridgeAgentConfiguration.getInstance().getPartitionId(), CartridgeAgentConfiguration.getInstance().getMemberId()); - + String topic = Util.getMessageTopicName(event); EventPublisher eventPublisher = EventPublisherPool - .getPublisher(Constants.INSTANCE_STATUS_TOPIC); + .getPublisher(topic); eventPublisher.publish(event); setMaintenance(true); if (log.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/d789aa66/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java index 2f8be04..2a419aa 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java @@ -57,6 +57,7 @@ import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.event.tenant.SubscriptionDomainAddedEvent; import org.apache.stratos.messaging.event.tenant.SubscriptionDomainRemovedEvent; import org.apache.stratos.messaging.util.Constants; +import org.apache.stratos.messaging.util.Util; import org.wso2.carbon.context.CarbonContext; import org.apache.stratos.manager.publisher.CartridgeSubscriptionDataPublisher; @@ -386,13 +387,13 @@ public class CartridgeSubscriptionManager { log.info("Successfully added domains to cartridge subscription: [tenant-id] " + tenantId + " [subscription-alias] " + subscriptionAlias + " [domain-name] " + domainName + " [application-context] " +applicationContext); - EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC); Set<String> clusterIds = new HashSet<String>(); clusterIds.add(cartridgeSubscription.getCluster().getClusterDomain()); SubscriptionDomainAddedEvent event = new SubscriptionDomainAddedEvent(tenantId, cartridgeSubscription.getType(), clusterIds, domainName, applicationContext); - + String topic = Util.getMessageTopicName(event); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic); eventPublisher.publish(event); } @@ -416,12 +417,14 @@ public class CartridgeSubscriptionManager { log.info("Successfully removed domain from cartridge subscription: [tenant-id] " + tenantId + " [subscription-alias] " + subscriptionAlias + " [domain-name] " + domainName); - EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC); + Set<String> clusterIds = new HashSet<String>(); clusterIds.add(cartridgeSubscription.getCluster().getClusterDomain()); SubscriptionDomainRemovedEvent event = new SubscriptionDomainRemovedEvent(tenantId, cartridgeSubscription.getType(), clusterIds, domainName); + String topic = Util.getMessageTopicName(event); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic); eventPublisher.publish(event); } http://git-wip-us.apache.org/repos/asf/stratos/blob/d789aa66/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java index 39f674b..6a45995 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java @@ -43,6 +43,7 @@ import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent; import org.apache.stratos.messaging.event.tenant.TenantUnSubscribedEvent; import org.apache.stratos.messaging.util.Constants; +import org.apache.stratos.messaging.util.Util; import java.util.Set; import java.util.concurrent.Executor; @@ -170,7 +171,8 @@ public class CartridgeSubscriptionUtils { log.info(String.format("Publishing tenant subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName)); } TenantSubscribedEvent subscribedEvent = new TenantSubscribedEvent(tenantId, serviceName, clusterIds); - EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC); + String topic = Util.getMessageTopicName(subscribedEvent); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic); eventPublisher.publish(subscribedEvent); } catch (Exception e) { if (log.isErrorEnabled()) { @@ -201,7 +203,8 @@ public class CartridgeSubscriptionUtils { log.info(String.format("Publishing tenant un-subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName)); } TenantUnSubscribedEvent event = new TenantUnSubscribedEvent(tenantId, serviceName, clusterIds); - EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC); + String topic = Util.getMessageTopicName(event); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic); eventPublisher.publish(event); } catch (Exception e) { if (log.isErrorEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/d789aa66/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java index a6db9ee..168ebe3 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java @@ -32,6 +32,7 @@ import org.apache.stratos.messaging.message.JsonMessage; public class Util { private static final Log log = LogFactory.getLog(Util.class); + public static final int BEGIN_INDEX = 35; public static Properties getProperties(String filePath) { Properties props = new Properties(); @@ -157,7 +158,7 @@ public class Util { } public static String getMessageTopicName(Event event) { - return event.getClass().getName().substring(35).replace(".", "/"); + return event.getClass().getName().substring(BEGIN_INDEX).replace(".", "/"); } public static String getEventNameForTopic(String arg0) {
