Updated Branches: refs/heads/master 7c2f7b6b6 -> 258ebf0af
Fixed tenant receiver logic in load balancer and updated adc mgr Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/258ebf0a Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/258ebf0a Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/258ebf0a Branch: refs/heads/master Commit: 258ebf0afb9d1cb882dadf6936063ec26b5a46f8 Parents: 7c2f7b6 Author: Imesh Gunaratne <[email protected]> Authored: Wed Dec 18 14:36:19 2013 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Wed Dec 18 14:36:19 2013 +0530 ---------------------------------------------------------------------- .../manager/CartridgeSubscriptionManager.java | 9 ++++- .../service/ApplicationManagementService.java | 35 ++---------------- .../utils/CartridgeSubscriptionUtils.java | 34 ++++++++++++++++++ .../balancer/LoadBalancerTenantReceiver.java | 37 +++++++++++++++++--- .../TopologyFilterConfigurator.java | 9 +++++ .../internal/LoadBalancerServiceComponent.java | 11 ++++++ 6 files changed, 98 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/258ebf0a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/manager/CartridgeSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/manager/CartridgeSubscriptionManager.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/manager/CartridgeSubscriptionManager.java index 45181ed..600e5cc 100644 --- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/manager/CartridgeSubscriptionManager.java +++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/manager/CartridgeSubscriptionManager.java @@ -212,6 +212,10 @@ public class CartridgeSubscriptionManager { cartridgeSubscription.setPayloadData(payloadData); + // Publish tenant subscribed event to message broker + CartridgeSubscriptionUtils.publishTenantSubscribedEvent(cartridgeSubscription.getSubscriber().getTenantId(), + cartridgeSubscription.getCartridgeInfo().getType()); + return cartridgeSubscription; } @@ -333,7 +337,10 @@ public class CartridgeSubscriptionManager { if(cartridgeSubscription != null) { cartridgeSubscription.removeSubscription(); - //CartridgeInstanceCache.getCartridgeInstanceCache().removeCartridgeInstance(cartridgeInstanceCacheKey); + + // Publish tenant un-subscribed event to message broker + CartridgeSubscriptionUtils.publishTenantUnSubscribedEvent(cartridgeSubscription.getSubscriber().getTenantId(), + cartridgeSubscription.getCartridgeInfo().getType()); } else { if(log.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/258ebf0a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/service/ApplicationManagementService.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/service/ApplicationManagementService.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/service/ApplicationManagementService.java index 8ea7326..9f67b4a 100644 --- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/service/ApplicationManagementService.java +++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/service/ApplicationManagementService.java @@ -30,6 +30,7 @@ import org.apache.stratos.adc.mgt.exception.*; import org.apache.stratos.adc.mgt.subscription.CartridgeSubscription; import org.apache.stratos.adc.mgt.internal.DataHolder; import org.apache.stratos.adc.mgt.manager.CartridgeSubscriptionManager; +import org.apache.stratos.adc.mgt.subscription.utils.CartridgeSubscriptionUtils; import org.apache.stratos.adc.mgt.utils.ApplicationManagementUtil; import org.apache.stratos.adc.mgt.utils.CartridgeConstants; import org.apache.stratos.adc.mgt.utils.PersistenceManager; @@ -409,7 +410,7 @@ public class ApplicationManagementService extends AbstractAdmin { } if (connectingCartridgeSubscription != null) { // Publish tenant subscribed event - publishTenantSubscribedEvent(getTenantId(), connectingCartridgeSubscription.getCartridgeInfo().getType()); + CartridgeSubscriptionUtils.publishTenantSubscribedEvent(getTenantId(), connectingCartridgeSubscription.getCartridgeInfo().getType()); try { cartridgeSubsciptionManager.connectCartridges(getTenantDomain(), cartridgeSubscription, @@ -431,36 +432,6 @@ public class ApplicationManagementService extends AbstractAdmin { } - private void publishTenantSubscribedEvent(int tenantId, String serviceName) { - try { - if(log.isInfoEnabled()) { - log.info(String.format("Publishing tenant subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName)); - } - TenantSubscribedEvent subscribedEvent = new TenantSubscribedEvent(tenantId, serviceName); - EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC); - eventPublisher.publish(subscribedEvent); - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error(String.format("Could not publish tenant subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName), e); - } - } - } - - private void publishTenantUnSubscribedEvent(int tenantId, String serviceName) { - try { - if(log.isInfoEnabled()) { - log.info(String.format("Publishing tenant un-subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName)); - } - TenantUnSubscribedEvent event = new TenantUnSubscribedEvent(tenantId, serviceName); - EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC); - eventPublisher.publish(event); - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error(String.format("Could not publish tenant un-subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName), e); - } - } - } - /** * Unsubscribing the cartridge * @@ -487,7 +458,7 @@ public class ApplicationManagementService extends AbstractAdmin { // Publish tenant un-subscribed event String serviceName = subscription.getCartridge(); - publishTenantUnSubscribedEvent(getTenantId(), serviceName); + CartridgeSubscriptionUtils.publishTenantUnSubscribedEvent(getTenantId(), serviceName); /*CartridgeSubscriptionInfo subscription = null; http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/258ebf0a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/subscription/utils/CartridgeSubscriptionUtils.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/subscription/utils/CartridgeSubscriptionUtils.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/subscription/utils/CartridgeSubscriptionUtils.java index 66510a5..5a0dee9 100644 --- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/subscription/utils/CartridgeSubscriptionUtils.java +++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/subscription/utils/CartridgeSubscriptionUtils.java @@ -26,6 +26,10 @@ import org.apache.stratos.adc.mgt.deploy.service.Service; import org.apache.stratos.adc.mgt.payload.BasicPayloadData; import org.apache.stratos.adc.mgt.subscription.CartridgeSubscription; import org.apache.stratos.cloud.controller.pojo.CartridgeInfo; +import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent; +import org.apache.stratos.messaging.event.tenant.TenantUnSubscribedEvent; +import org.apache.stratos.messaging.util.Constants; public class CartridgeSubscriptionUtils { @@ -95,4 +99,34 @@ public class CartridgeSubscriptionUtils { log.info("Generated key : " + key); // TODO -- remove the log return key; } + + public static void publishTenantSubscribedEvent(int tenantId, String serviceName) { + try { + if(log.isInfoEnabled()) { + log.info(String.format("Publishing tenant subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName)); + } + TenantSubscribedEvent subscribedEvent = new TenantSubscribedEvent(tenantId, serviceName); + EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC); + eventPublisher.publish(subscribedEvent); + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error(String.format("Could not publish tenant subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName), e); + } + } + } + + public static void publishTenantUnSubscribedEvent(int tenantId, String serviceName) { + try { + if(log.isInfoEnabled()) { + log.info(String.format("Publishing tenant un-subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName)); + } + TenantUnSubscribedEvent event = new TenantUnSubscribedEvent(tenantId, serviceName); + EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC); + eventPublisher.publish(event); + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error(String.format("Could not publish tenant un-subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName), e); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/258ebf0a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java index bcdbd9e..48ba986 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java @@ -25,6 +25,7 @@ import org.apache.stratos.load.balancer.context.LoadBalancerContext; import org.apache.stratos.messaging.domain.tenant.Tenant; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.domain.topology.ServiceType; import org.apache.stratos.messaging.event.Event; import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent; import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent; @@ -68,7 +69,9 @@ public class LoadBalancerTenantReceiver implements Runnable { CompleteTenantEvent completeTenantEvent = (CompleteTenantEvent) event; for (Tenant tenant : completeTenantEvent.getTenants()) { for (String serviceName : tenant.getServiceSubscriptions()) { - addTenantSubscriptionToLbContext(serviceName, tenant.getTenantId()); + if(isMultiTenantService(serviceName)) { + addTenantSubscriptionToLbContext(serviceName, tenant.getTenantId()); + } } } } @@ -77,19 +80,45 @@ public class LoadBalancerTenantReceiver implements Runnable { @Override protected void onEvent(Event event) { TenantSubscribedEvent tenantSubscribedEvent = (TenantSubscribedEvent) event; - addTenantSubscriptionToLbContext(tenantSubscribedEvent.getServiceName(), tenantSubscribedEvent.getTenantId()); + if(log.isDebugEnabled()) { + log.debug(String.format("Tenant subscribed event received: [tenant-id] %d [service] %s", + tenantSubscribedEvent.getTenantId(), tenantSubscribedEvent.getServiceName())); + } + if(isMultiTenantService(tenantSubscribedEvent.getServiceName())) { + addTenantSubscriptionToLbContext(tenantSubscribedEvent.getServiceName(), tenantSubscribedEvent.getTenantId()); + } } }); messageProcessorChain.addEventListener(new TenantUnSubscribedEventListener() { @Override protected void onEvent(Event event) { TenantUnSubscribedEvent tenantUnSubscribedEvent = (TenantUnSubscribedEvent) event; - removeTenantSubscriptionFromLbContext(tenantUnSubscribedEvent.getServiceName(), tenantUnSubscribedEvent.getTenantId()); + if(log.isDebugEnabled()) { + log.debug(String.format("Tenant un-subscribed event received: [tenant-id] %d [service] %s", + tenantUnSubscribedEvent.getTenantId(), tenantUnSubscribedEvent.getServiceName())); + } + if(isMultiTenantService(tenantUnSubscribedEvent.getServiceName())) { + removeTenantSubscriptionFromLbContext(tenantUnSubscribedEvent.getServiceName(), tenantUnSubscribedEvent.getTenantId()); + } } }); return messageProcessorChain; } + private boolean isMultiTenantService(String serviceName) { + try { + TopologyManager.acquireReadLock(); + Service service = TopologyManager.getTopology().getService(serviceName); + if(service != null) { + return (service.getServiceType() == ServiceType.MultiTenant); + } + return false; + } + finally { + TopologyManager.releaseReadLock(); + } + } + private void addTenantSubscriptionToLbContext(String serviceName, int tenantId) { // Find cluster of tenant Cluster cluster = findCluster(serviceName, tenantId); @@ -105,7 +134,7 @@ public class LoadBalancerTenantReceiver implements Runnable { clusterMap.put(tenantId, cluster); } if (log.isDebugEnabled()) { - log.debug(String.format("Cluster added to multi-tenant clusters map: [host-name] %s [tenant-id] %d [cluster] %s", + log.debug(String.format("Cluster added to multi-tenant cluster map: [host-name] %s [tenant-id] %d [cluster] %s", hostName, tenantId, cluster.getClusterId())); } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/258ebf0a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/TopologyFilterConfigurator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/TopologyFilterConfigurator.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/TopologyFilterConfigurator.java index e9b6326..59d8c2f 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/TopologyFilterConfigurator.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/TopologyFilterConfigurator.java @@ -19,6 +19,8 @@ package org.apache.stratos.load.balancer.conf.configurator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration; import org.apache.stratos.messaging.util.Constants; @@ -26,10 +28,17 @@ import org.apache.stratos.messaging.util.Constants; * Topology filter configurator to configure topology filters. */ public class TopologyFilterConfigurator { + private static final Log log = LogFactory.getLog(TopologyFilterConfigurator.class); public static void configure(LoadBalancerConfiguration configuration) { System.setProperty(Constants.TOPOLOGY_SERVICE_FILTER, configuration.getTopologyServiceFilter()); System.setProperty(Constants.TOPOLOGY_CLUSTER_FILTER, configuration.getTopologyClusterFilter()); System.setProperty(Constants.TOPOLOGY_MEMBER_FILTER, configuration.getTopologyMemberFilter()); + + if(log.isDebugEnabled()) { + log.debug(String.format("service filter: ", System.getProperty(Constants.TOPOLOGY_SERVICE_FILTER))); + log.debug(String.format("cluster filter: ", System.getProperty(Constants.TOPOLOGY_CLUSTER_FILTER))); + log.debug(String.format("member filter: ", System.getProperty(Constants.TOPOLOGY_MEMBER_FILTER))); + } } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/258ebf0a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java index 48387c4..f5077df 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java @@ -33,6 +33,7 @@ import org.apache.stratos.load.balancer.conf.configurator.JndiConfigurator; import org.apache.stratos.load.balancer.conf.configurator.SynapseConfigurator; import org.apache.stratos.load.balancer.context.LoadBalancerContext; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; +import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.synapse.config.SynapseConfiguration; import org.apache.synapse.config.xml.MultiXMLConfigurationBuilder; @@ -161,6 +162,16 @@ public class LoadBalancerServiceComponent { } log.info(String.format("Cluster filter activated: [clusters] %s", sb.toString())); } + if (TopologyMemberFilter.getInstance().isActive()) { + StringBuilder sb = new StringBuilder(); + for (String clusterId : TopologyMemberFilter.getInstance().getIncludedLbClusterIds()) { + if (sb.length() > 0) { + sb.append(", "); + } + sb.append(clusterId); + } + log.info(String.format("Member filter activated: [lb-cluster-ids] %s", sb.toString())); + } } }
