Updated Branches:
  refs/heads/master 7c2f7b6b6 -> 258ebf0af

Fixed tenant receiver logic in load balancer and updated adc mgr


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/258ebf0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/258ebf0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/258ebf0a

Branch: refs/heads/master
Commit: 258ebf0afb9d1cb882dadf6936063ec26b5a46f8
Parents: 7c2f7b6
Author: Imesh Gunaratne <[email protected]>
Authored: Wed Dec 18 14:36:19 2013 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Wed Dec 18 14:36:19 2013 +0530

----------------------------------------------------------------------
 .../manager/CartridgeSubscriptionManager.java   |  9 ++++-
 .../service/ApplicationManagementService.java   | 35 ++----------------
 .../utils/CartridgeSubscriptionUtils.java       | 34 ++++++++++++++++++
 .../balancer/LoadBalancerTenantReceiver.java    | 37 +++++++++++++++++---
 .../TopologyFilterConfigurator.java             |  9 +++++
 .../internal/LoadBalancerServiceComponent.java  | 11 ++++++
 6 files changed, 98 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/258ebf0a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/manager/CartridgeSubscriptionManager.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/manager/CartridgeSubscriptionManager.java
 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/manager/CartridgeSubscriptionManager.java
index 45181ed..600e5cc 100644
--- 
a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/manager/CartridgeSubscriptionManager.java
+++ 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/manager/CartridgeSubscriptionManager.java
@@ -212,6 +212,10 @@ public class CartridgeSubscriptionManager {
 
         cartridgeSubscription.setPayloadData(payloadData);
 
+        // Publish tenant subscribed event to message broker
+        
CartridgeSubscriptionUtils.publishTenantSubscribedEvent(cartridgeSubscription.getSubscriber().getTenantId(),
+                cartridgeSubscription.getCartridgeInfo().getType());
+
         return cartridgeSubscription;
     }
 
@@ -333,7 +337,10 @@ public class CartridgeSubscriptionManager {
 
         if(cartridgeSubscription != null) {
             cartridgeSubscription.removeSubscription();
-            
//CartridgeInstanceCache.getCartridgeInstanceCache().removeCartridgeInstance(cartridgeInstanceCacheKey);
+
+            // Publish tenant un-subscribed event to message broker
+            
CartridgeSubscriptionUtils.publishTenantUnSubscribedEvent(cartridgeSubscription.getSubscriber().getTenantId(),
+                    cartridgeSubscription.getCartridgeInfo().getType());
         }
         else {
             if(log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/258ebf0a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/service/ApplicationManagementService.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/service/ApplicationManagementService.java
 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/service/ApplicationManagementService.java
index 8ea7326..9f67b4a 100644
--- 
a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/service/ApplicationManagementService.java
+++ 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/service/ApplicationManagementService.java
@@ -30,6 +30,7 @@ import org.apache.stratos.adc.mgt.exception.*;
 import org.apache.stratos.adc.mgt.subscription.CartridgeSubscription;
 import org.apache.stratos.adc.mgt.internal.DataHolder;
 import org.apache.stratos.adc.mgt.manager.CartridgeSubscriptionManager;
+import 
org.apache.stratos.adc.mgt.subscription.utils.CartridgeSubscriptionUtils;
 import org.apache.stratos.adc.mgt.utils.ApplicationManagementUtil;
 import org.apache.stratos.adc.mgt.utils.CartridgeConstants;
 import org.apache.stratos.adc.mgt.utils.PersistenceManager;
@@ -409,7 +410,7 @@ public class ApplicationManagementService extends 
AbstractAdmin {
             }
             if (connectingCartridgeSubscription != null) {
                 // Publish tenant subscribed event
-                publishTenantSubscribedEvent(getTenantId(), 
connectingCartridgeSubscription.getCartridgeInfo().getType());
+                
CartridgeSubscriptionUtils.publishTenantSubscribedEvent(getTenantId(), 
connectingCartridgeSubscription.getCartridgeInfo().getType());
 
                 try {
                     
cartridgeSubsciptionManager.connectCartridges(getTenantDomain(), 
cartridgeSubscription,
@@ -431,36 +432,6 @@ public class ApplicationManagementService extends 
AbstractAdmin {
 
        }
 
-    private void publishTenantSubscribedEvent(int tenantId, String 
serviceName) {
-        try {
-            if(log.isInfoEnabled()) {
-                log.info(String.format("Publishing tenant subscribed event: 
[tenant-id] %d [service] %s", tenantId, serviceName));
-            }
-            TenantSubscribedEvent subscribedEvent = new 
TenantSubscribedEvent(tenantId, serviceName);
-            EventPublisher eventPublisher = new 
EventPublisher(Constants.TENANT_TOPIC);
-            eventPublisher.publish(subscribedEvent);
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error(String.format("Could not publish tenant subscribed 
event: [tenant-id] %d [service] %s", tenantId, serviceName), e);
-            }
-        }
-    }
-
-    private void publishTenantUnSubscribedEvent(int tenantId, String 
serviceName) {
-        try {
-            if(log.isInfoEnabled()) {
-                log.info(String.format("Publishing tenant un-subscribed event: 
[tenant-id] %d [service] %s", tenantId, serviceName));
-            }
-            TenantUnSubscribedEvent event = new 
TenantUnSubscribedEvent(tenantId, serviceName);
-            EventPublisher eventPublisher = new 
EventPublisher(Constants.TENANT_TOPIC);
-            eventPublisher.publish(event);
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error(String.format("Could not publish tenant 
un-subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName), e);
-            }
-        }
-    }
-
     /**
      * Unsubscribing the cartridge
      *
@@ -487,7 +458,7 @@ public class ApplicationManagementService extends 
AbstractAdmin {
 
         // Publish tenant un-subscribed event
         String serviceName = subscription.getCartridge();
-        publishTenantUnSubscribedEvent(getTenantId(), serviceName);
+        
CartridgeSubscriptionUtils.publishTenantUnSubscribedEvent(getTenantId(), 
serviceName);
 
 
         /*CartridgeSubscriptionInfo subscription = null;

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/258ebf0a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/subscription/utils/CartridgeSubscriptionUtils.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/subscription/utils/CartridgeSubscriptionUtils.java
 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/subscription/utils/CartridgeSubscriptionUtils.java
index 66510a5..5a0dee9 100644
--- 
a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/subscription/utils/CartridgeSubscriptionUtils.java
+++ 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/subscription/utils/CartridgeSubscriptionUtils.java
@@ -26,6 +26,10 @@ import org.apache.stratos.adc.mgt.deploy.service.Service;
 import org.apache.stratos.adc.mgt.payload.BasicPayloadData;
 import org.apache.stratos.adc.mgt.subscription.CartridgeSubscription;
 import org.apache.stratos.cloud.controller.pojo.CartridgeInfo;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent;
+import org.apache.stratos.messaging.event.tenant.TenantUnSubscribedEvent;
+import org.apache.stratos.messaging.util.Constants;
 
 public class CartridgeSubscriptionUtils {
 
@@ -95,4 +99,34 @@ public class CartridgeSubscriptionUtils {
         log.info("Generated key  : " + key); // TODO -- remove the log
         return key;
     }
+
+    public static void publishTenantSubscribedEvent(int tenantId, String 
serviceName) {
+        try {
+            if(log.isInfoEnabled()) {
+                log.info(String.format("Publishing tenant subscribed event: 
[tenant-id] %d [service] %s", tenantId, serviceName));
+            }
+            TenantSubscribedEvent subscribedEvent = new 
TenantSubscribedEvent(tenantId, serviceName);
+            EventPublisher eventPublisher = new 
EventPublisher(Constants.TENANT_TOPIC);
+            eventPublisher.publish(subscribedEvent);
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error(String.format("Could not publish tenant subscribed 
event: [tenant-id] %d [service] %s", tenantId, serviceName), e);
+            }
+        }
+    }
+
+    public static void publishTenantUnSubscribedEvent(int tenantId, String 
serviceName) {
+        try {
+            if(log.isInfoEnabled()) {
+                log.info(String.format("Publishing tenant un-subscribed event: 
[tenant-id] %d [service] %s", tenantId, serviceName));
+            }
+            TenantUnSubscribedEvent event = new 
TenantUnSubscribedEvent(tenantId, serviceName);
+            EventPublisher eventPublisher = new 
EventPublisher(Constants.TENANT_TOPIC);
+            eventPublisher.publish(event);
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error(String.format("Could not publish tenant 
un-subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName), e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/258ebf0a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java
index bcdbd9e..48ba986 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java
@@ -25,6 +25,7 @@ import 
org.apache.stratos.load.balancer.context.LoadBalancerContext;
 import org.apache.stratos.messaging.domain.tenant.Tenant;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.ServiceType;
 import org.apache.stratos.messaging.event.Event;
 import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
 import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent;
@@ -68,7 +69,9 @@ public class LoadBalancerTenantReceiver implements Runnable {
                 CompleteTenantEvent completeTenantEvent = 
(CompleteTenantEvent) event;
                 for (Tenant tenant : completeTenantEvent.getTenants()) {
                     for (String serviceName : 
tenant.getServiceSubscriptions()) {
-                        addTenantSubscriptionToLbContext(serviceName, 
tenant.getTenantId());
+                        if(isMultiTenantService(serviceName)) {
+                            addTenantSubscriptionToLbContext(serviceName, 
tenant.getTenantId());
+                        }
                     }
                 }
             }
@@ -77,19 +80,45 @@ public class LoadBalancerTenantReceiver implements Runnable 
{
             @Override
             protected void onEvent(Event event) {
                 TenantSubscribedEvent tenantSubscribedEvent = 
(TenantSubscribedEvent) event;
-                
addTenantSubscriptionToLbContext(tenantSubscribedEvent.getServiceName(), 
tenantSubscribedEvent.getTenantId());
+                if(log.isDebugEnabled()) {
+                    log.debug(String.format("Tenant subscribed event received: 
[tenant-id] %d [service] %s",
+                            tenantSubscribedEvent.getTenantId(), 
tenantSubscribedEvent.getServiceName()));
+                }
+                
if(isMultiTenantService(tenantSubscribedEvent.getServiceName())) {
+                    
addTenantSubscriptionToLbContext(tenantSubscribedEvent.getServiceName(), 
tenantSubscribedEvent.getTenantId());
+                }
             }
         });
         messageProcessorChain.addEventListener(new 
TenantUnSubscribedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 TenantUnSubscribedEvent tenantUnSubscribedEvent = 
(TenantUnSubscribedEvent) event;
-                
removeTenantSubscriptionFromLbContext(tenantUnSubscribedEvent.getServiceName(), 
tenantUnSubscribedEvent.getTenantId());
+                if(log.isDebugEnabled()) {
+                    log.debug(String.format("Tenant un-subscribed event 
received: [tenant-id] %d [service] %s",
+                            tenantUnSubscribedEvent.getTenantId(), 
tenantUnSubscribedEvent.getServiceName()));
+                }
+                
if(isMultiTenantService(tenantUnSubscribedEvent.getServiceName())) {
+                    
removeTenantSubscriptionFromLbContext(tenantUnSubscribedEvent.getServiceName(), 
tenantUnSubscribedEvent.getTenantId());
+                }
             }
         });
         return messageProcessorChain;
     }
 
+    private boolean isMultiTenantService(String serviceName) {
+        try {
+            TopologyManager.acquireReadLock();
+            Service service = 
TopologyManager.getTopology().getService(serviceName);
+            if(service != null) {
+                return (service.getServiceType() == ServiceType.MultiTenant);
+            }
+            return false;
+        }
+        finally {
+            TopologyManager.releaseReadLock();
+        }
+    }
+
     private void addTenantSubscriptionToLbContext(String serviceName, int 
tenantId) {
         // Find cluster of tenant
         Cluster cluster = findCluster(serviceName, tenantId);
@@ -105,7 +134,7 @@ public class LoadBalancerTenantReceiver implements Runnable 
{
                     clusterMap.put(tenantId, cluster);
                 }
                 if (log.isDebugEnabled()) {
-                    log.debug(String.format("Cluster added to multi-tenant 
clusters map: [host-name] %s [tenant-id] %d [cluster] %s",
+                    log.debug(String.format("Cluster added to multi-tenant 
cluster map: [host-name] %s [tenant-id] %d [cluster] %s",
                             hostName, tenantId, cluster.getClusterId()));
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/258ebf0a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/TopologyFilterConfigurator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/TopologyFilterConfigurator.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/TopologyFilterConfigurator.java
index e9b6326..59d8c2f 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/TopologyFilterConfigurator.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/TopologyFilterConfigurator.java
@@ -19,6 +19,8 @@
 
 package org.apache.stratos.load.balancer.conf.configurator;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration;
 import org.apache.stratos.messaging.util.Constants;
 
@@ -26,10 +28,17 @@ import org.apache.stratos.messaging.util.Constants;
  * Topology filter configurator to configure topology filters.
  */
 public class TopologyFilterConfigurator {
+    private static final Log log = 
LogFactory.getLog(TopologyFilterConfigurator.class);
 
     public static void configure(LoadBalancerConfiguration configuration) {
         System.setProperty(Constants.TOPOLOGY_SERVICE_FILTER, 
configuration.getTopologyServiceFilter());
         System.setProperty(Constants.TOPOLOGY_CLUSTER_FILTER, 
configuration.getTopologyClusterFilter());
         System.setProperty(Constants.TOPOLOGY_MEMBER_FILTER, 
configuration.getTopologyMemberFilter());
+
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("service filter: ", 
System.getProperty(Constants.TOPOLOGY_SERVICE_FILTER)));
+            log.debug(String.format("cluster filter: ", 
System.getProperty(Constants.TOPOLOGY_CLUSTER_FILTER)));
+            log.debug(String.format("member filter: ", 
System.getProperty(Constants.TOPOLOGY_MEMBER_FILTER)));
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/258ebf0a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index 48387c4..f5077df 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -33,6 +33,7 @@ import 
org.apache.stratos.load.balancer.conf.configurator.JndiConfigurator;
 import org.apache.stratos.load.balancer.conf.configurator.SynapseConfigurator;
 import org.apache.stratos.load.balancer.context.LoadBalancerContext;
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import 
org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.config.xml.MultiXMLConfigurationBuilder;
@@ -161,6 +162,16 @@ public class LoadBalancerServiceComponent {
                         }
                         log.info(String.format("Cluster filter activated: 
[clusters] %s", sb.toString()));
                     }
+                    if (TopologyMemberFilter.getInstance().isActive()) {
+                        StringBuilder sb = new StringBuilder();
+                        for (String clusterId : 
TopologyMemberFilter.getInstance().getIncludedLbClusterIds()) {
+                            if (sb.length() > 0) {
+                                sb.append(", ");
+                            }
+                            sb.append(clusterId);
+                        }
+                        log.info(String.format("Member filter activated: 
[lb-cluster-ids] %s", sb.toString()));
+                    }
                 }
             }
 

Reply via email to