update tanent event publishers with MQTT support

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/d789aa66
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/d789aa66
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/d789aa66

Branch: refs/heads/master
Commit: d789aa66ca55b9fda0662865fce818c390a4f043
Parents: 4598778
Author: gayan <[email protected]>
Authored: Wed Sep 24 12:19:18 2014 +0530
Committer: Nirmal Fernando <[email protected]>
Committed: Wed Sep 24 16:00:10 2014 +0530

----------------------------------------------------------------------
 .../publisher/CartridgeAgentEventPublisher.java      | 15 ++++++++-------
 .../manager/CartridgeSubscriptionManager.java        |  9 ++++++---
 .../utils/CartridgeSubscriptionUtils.java            |  7 +++++--
 .../java/org/apache/stratos/messaging/util/Util.java |  3 ++-
 4 files changed, 21 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/d789aa66/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
index c57ec2b..9eda926 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
@@ -55,9 +55,9 @@ public class CartridgeAgentEventPublisher {
                                        
CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(),
                                        
CartridgeAgentConfiguration.getInstance().getPartitionId(),
                                        
CartridgeAgentConfiguration.getInstance().getMemberId());
-
+                       String topic = Util.getMessageTopicName(event);
                        EventPublisher eventPublisher = EventPublisherPool
-                                       
.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
+                                       .getPublisher(topic);
                        eventPublisher.publish(event);
                        setStarted(true);
                        if (log.isInfoEnabled()) {
@@ -84,8 +84,9 @@ public class CartridgeAgentEventPublisher {
                                        
CartridgeAgentConfiguration.getInstance().getMemberId());
 
                        // Event publisher connection will
+                       String topic = Util.getMessageTopicName(event);
                        EventPublisher eventPublisher = EventPublisherPool
-                                       
.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
+                                       .getPublisher(topic);
                        eventPublisher.publish(event);
                        if (log.isInfoEnabled()) {
                                log.info("Instance activated event published");
@@ -118,9 +119,9 @@ public class CartridgeAgentEventPublisher {
                                        
CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(),
                                        
CartridgeAgentConfiguration.getInstance().getPartitionId(),
                                        
CartridgeAgentConfiguration.getInstance().getMemberId());
-
+                       String topic = Util.getMessageTopicName(event);
                        EventPublisher eventPublisher = EventPublisherPool
-                                       
.getPublisher(Util.getMessageTopicName(event));
+                                       .getPublisher(topic);
                        eventPublisher.publish(event);
                        setReadyToShutdown(true);
                        if (log.isInfoEnabled()) {
@@ -144,9 +145,9 @@ public class CartridgeAgentEventPublisher {
                                        
CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(),
                                        
CartridgeAgentConfiguration.getInstance().getPartitionId(),
                                        
CartridgeAgentConfiguration.getInstance().getMemberId());
-
+                       String topic = Util.getMessageTopicName(event);
                        EventPublisher eventPublisher = EventPublisherPool
-                                       
.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
+                                       .getPublisher(topic);
                        eventPublisher.publish(event);
                        setMaintenance(true);
                        if (log.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/d789aa66/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java
index 2f8be04..2a419aa 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java
@@ -57,6 +57,7 @@ import org.apache.stratos.messaging.domain.topology.Member;
 import org.apache.stratos.messaging.event.tenant.SubscriptionDomainAddedEvent;
 import 
org.apache.stratos.messaging.event.tenant.SubscriptionDomainRemovedEvent;
 import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.util.Util;
 import org.wso2.carbon.context.CarbonContext;
 import org.apache.stratos.manager.publisher.CartridgeSubscriptionDataPublisher;
 
@@ -386,13 +387,13 @@ public class CartridgeSubscriptionManager {
         log.info("Successfully added domains to cartridge subscription: 
[tenant-id] " + tenantId + " [subscription-alias] " + subscriptionAlias +
                 " [domain-name] " + domainName + " [application-context] " 
+applicationContext);
 
-        EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
 
         Set<String> clusterIds = new HashSet<String>();
         clusterIds.add(cartridgeSubscription.getCluster().getClusterDomain());
         SubscriptionDomainAddedEvent event = new 
SubscriptionDomainAddedEvent(tenantId, cartridgeSubscription.getType(),
                 clusterIds, domainName, applicationContext);
-
+           String topic = Util.getMessageTopicName(event);
+           EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(topic);
         eventPublisher.publish(event);
     }
 
@@ -416,12 +417,14 @@ public class CartridgeSubscriptionManager {
         log.info("Successfully removed domain from cartridge subscription: 
[tenant-id] " + tenantId + " [subscription-alias] " + subscriptionAlias +
                 " [domain-name] " + domainName);
 
-        EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
+
 
         Set<String> clusterIds = new HashSet<String>();
         clusterIds.add(cartridgeSubscription.getCluster().getClusterDomain());
         SubscriptionDomainRemovedEvent event = new 
SubscriptionDomainRemovedEvent(tenantId, cartridgeSubscription.getType(),
                 clusterIds, domainName);
+           String topic = Util.getMessageTopicName(event);
+           EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(topic);
         eventPublisher.publish(event);
     }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/d789aa66/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
index 39f674b..6a45995 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
@@ -43,6 +43,7 @@ import 
org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent;
 import org.apache.stratos.messaging.event.tenant.TenantUnSubscribedEvent;
 import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.util.Util;
 
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -170,7 +171,8 @@ public class CartridgeSubscriptionUtils {
                     log.info(String.format("Publishing tenant subscribed 
event: [tenant-id] %d [service] %s", tenantId, serviceName));
                 }
                 TenantSubscribedEvent subscribedEvent = new 
TenantSubscribedEvent(tenantId, serviceName, clusterIds);
-                EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
+                String topic = Util.getMessageTopicName(subscribedEvent);
+                EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(topic);
                 eventPublisher.publish(subscribedEvent);
             } catch (Exception e) {
                 if (log.isErrorEnabled()) {
@@ -201,7 +203,8 @@ public class CartridgeSubscriptionUtils {
                 log.info(String.format("Publishing tenant un-subscribed event: 
[tenant-id] %d [service] %s", tenantId, serviceName));
             }
             TenantUnSubscribedEvent event = new 
TenantUnSubscribedEvent(tenantId, serviceName, clusterIds);
-            EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
+               String topic = Util.getMessageTopicName(event);
+            EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(topic);
             eventPublisher.publish(event);
         } catch (Exception e) {
             if (log.isErrorEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/d789aa66/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
index a6db9ee..168ebe3 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
@@ -32,6 +32,7 @@ import org.apache.stratos.messaging.message.JsonMessage;
 
 public class Util {
        private static final Log log = LogFactory.getLog(Util.class);
+       public static final int BEGIN_INDEX = 35;
 
        public static Properties getProperties(String filePath) {
                Properties props = new Properties();
@@ -157,7 +158,7 @@ public class Util {
        }
 
        public static String getMessageTopicName(Event event) {
-               return event.getClass().getName().substring(35).replace(".", 
"/");
+               return 
event.getClass().getName().substring(BEGIN_INDEX).replace(".", "/");
        }
 
        public static String getEventNameForTopic(String arg0) {

Reply via email to