Updated Branches: refs/heads/master 2dbf5598a -> 2fbd0fd38
Fixed tenant subscription publishing/receiving logic Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/2fbd0fd3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/2fbd0fd3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/2fbd0fd3 Branch: refs/heads/master Commit: 2fbd0fd380a1c617eef64128758ad87d4ecb7c3c Parents: 2dbf559 Author: Imesh Gunaratne <[email protected]> Authored: Sun Dec 8 10:01:20 2013 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Sun Dec 8 10:01:20 2013 +0530 ---------------------------------------------------------------------- .../mgt/publisher/TenantSynzhronizerTask.java | 26 ++++- .../balancer/LoadBalancerTenantReceiver.java | 105 +++++++++++-------- .../balancer/LoadBalancerTopologyReceiver.java | 64 ++--------- .../stratos/load/balancer/RequestDelegator.java | 14 +-- .../conf/LoadBalancerConfiguration.java | 17 +-- .../context/LoadBalancerContextUtil.java | 73 +++++++++++++ .../sample/configuration/loadbalancer2.conf | 4 +- .../stratos/messaging/domain/tenant/Tenant.java | 5 + .../distribution/src/main/conf/log4j.properties | 2 +- 9 files changed, 192 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java index 62c5103..d777c2d 100644 --- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java +++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java @@ -21,7 +21,9 @@ package org.apache.stratos.adc.mgt.publisher; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.adc.mgt.dao.CartridgeSubscriptionInfo; import org.apache.stratos.adc.mgt.internal.DataHolder; +import org.apache.stratos.adc.mgt.utils.PersistenceManager; import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.domain.tenant.Tenant; import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent; @@ -48,20 +50,34 @@ public class TenantSynzhronizerTask implements Task { @Override public void execute() { try { - if(log.isInfoEnabled()) { + if (log.isInfoEnabled()) { log.info(String.format("Publishing complete tenant event")); } + Tenant tenant; List<Tenant> tenants = new ArrayList<Tenant>(); TenantManager tenantManager = DataHolder.getRealmService().getTenantManager(); org.wso2.carbon.user.api.Tenant[] carbonTenants = tenantManager.getAllTenants(); - for(org.wso2.carbon.user.api.Tenant carbonTenant : carbonTenants) { - tenants.add(new Tenant(carbonTenant.getId(), carbonTenant.getDomain())); + for (org.wso2.carbon.user.api.Tenant carbonTenant : carbonTenants) { + // Create tenant + if(log.isDebugEnabled()) { + log.debug(String.format("Tenant found: [tenant-id] %d [tenant-domain] %s", carbonTenant.getId(), carbonTenant.getDomain())); + } + tenant = new Tenant(carbonTenant.getId(), carbonTenant.getDomain()); + // Add subscriptions + List<CartridgeSubscriptionInfo> subscriptions = PersistenceManager.getSubscriptionsForTenant(tenant.getTenantId()); + for (CartridgeSubscriptionInfo subscription : subscriptions) { + if(log.isDebugEnabled()) { + log.debug(String.format("Tenant subscription found: [tenant-id] %d [tenant-domain] %s [service] %s", + carbonTenant.getId(), carbonTenant.getDomain(), subscription.getCartridge())); + } + tenant.addServiceSubscription(subscription.getCartridge()); + } + tenants.add(tenant); } CompleteTenantEvent event = new CompleteTenantEvent(tenants); EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC); eventPublisher.publish(event); - } - catch (Exception e) { + } catch (Exception e) { if (log.isErrorEnabled()) { log.error("Could not publish complete tenant event", e); } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java index ad17ec9..dc6e402 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java @@ -22,11 +22,14 @@ package org.apache.stratos.load.balancer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.load.balancer.context.LoadBalancerContext; +import org.apache.stratos.messaging.domain.tenant.Tenant; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent; import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent; import org.apache.stratos.messaging.event.tenant.TenantUnSubscribedEvent; +import org.apache.stratos.messaging.listener.tenant.CompleteTenantEventListener; import org.apache.stratos.messaging.listener.tenant.TenantSubscribedEventListener; import org.apache.stratos.messaging.listener.tenant.TenantUnSubscribedEventListener; import org.apache.stratos.messaging.message.processor.tenant.TenantMessageProcessorChain; @@ -59,64 +62,80 @@ public class LoadBalancerTenantReceiver implements Runnable { private TenantMessageProcessorChain createEventProcessorChain() { TenantMessageProcessorChain messageProcessorChain = new TenantMessageProcessorChain(); - messageProcessorChain.addEventListener(new TenantSubscribedEventListener() { + messageProcessorChain.addEventListener(new CompleteTenantEventListener() { @Override protected void onEvent(Event event) { - TenantSubscribedEvent tenantSubscribedEvent = (TenantSubscribedEvent) event; - - // Find cluster of tenant - Cluster cluster = findCluster(tenantSubscribedEvent.getServiceName(), tenantSubscribedEvent.getTenantId()); - if(cluster != null) { - for(String hostName : cluster.getHostNames()) { - // Add hostName, tenantId, cluster to multi-tenant map - Map<Integer, Cluster> clusterMap = LoadBalancerContext.getInstance().getMultiTenantClusters(hostName); - if(clusterMap == null) { - clusterMap = new HashMap<Integer, Cluster>(); - clusterMap.put(tenantSubscribedEvent.getTenantId(), cluster); - LoadBalancerContext.getInstance().addMultiTenantClusters(hostName, clusterMap); - } - else { - clusterMap.put(tenantSubscribedEvent.getTenantId(), cluster); - } - if(log.isDebugEnabled()) { - log.debug(String.format("Cluster added to multi-tenant clusters map: [host-name] %s [tenant-id] %d [cluster] %s", - hostName, tenantSubscribedEvent.getTenantId(), cluster.getClusterId())); - } - } - } - else { - if(log.isErrorEnabled()) { - log.error(String.format("Could not find cluster of tenant: [service] %s [tenant-id] %d", - tenantSubscribedEvent.getServiceName(), tenantSubscribedEvent.getTenantId())); + CompleteTenantEvent completeTenantEvent = (CompleteTenantEvent) event; + for(Tenant tenant : completeTenantEvent.getTenants()) { + for(String serviceName : tenant.getServiceSubscriptions()) { + addTenantSubscriptionToLbContext(serviceName, tenant.getTenantId()); } } } }); + messageProcessorChain.addEventListener(new TenantSubscribedEventListener() { + @Override + protected void onEvent(Event event) { + TenantSubscribedEvent tenantSubscribedEvent = (TenantSubscribedEvent) event; + addTenantSubscriptionToLbContext(tenantSubscribedEvent.getServiceName(), tenantSubscribedEvent.getTenantId()); + } + }); messageProcessorChain.addEventListener(new TenantUnSubscribedEventListener() { @Override protected void onEvent(Event event) { TenantUnSubscribedEvent tenantUnSubscribedEvent = (TenantUnSubscribedEvent) event; + removeTenantSubscriptionFromLbContext(tenantUnSubscribedEvent.getServiceName(), tenantUnSubscribedEvent.getTenantId()); + } + }); + return messageProcessorChain; + } - // Find cluster of tenant - Cluster cluster = findCluster(tenantUnSubscribedEvent.getServiceName(), tenantUnSubscribedEvent.getTenantId()); - if(cluster != null) { - for(String hostName : cluster.getHostNames()) { - LoadBalancerContext.getInstance().removeMultiTenantClusters(hostName); - if(log.isDebugEnabled()) { - log.debug(String.format("Cluster removed from multi-tenant clusters map: [host-name] %s [tenant-id] %d [cluster] %s", - hostName, tenantUnSubscribedEvent.getTenantId(), cluster.getClusterId())); - } - } + private void addTenantSubscriptionToLbContext(String serviceName, int tenantId) { + // Find cluster of tenant + Cluster cluster = findCluster(serviceName, tenantId); + if(cluster != null) { + for(String hostName : cluster.getHostNames()) { + // Add hostName, tenantId, cluster to multi-tenant map + Map<Integer, Cluster> clusterMap = LoadBalancerContext.getInstance().getMultiTenantClusters(hostName); + if(clusterMap == null) { + clusterMap = new HashMap<Integer, Cluster>(); + clusterMap.put(tenantId, cluster); + LoadBalancerContext.getInstance().addMultiTenantClusters(hostName, clusterMap); } else { - if(log.isErrorEnabled()) { - log.error(String.format("Could not find cluster of tenant: [service] %s [tenant-id] %d", - tenantUnSubscribedEvent.getServiceName(), tenantUnSubscribedEvent.getTenantId())); - } + clusterMap.put(tenantId, cluster); + } + if(log.isDebugEnabled()) { + log.debug(String.format("Cluster added to multi-tenant clusters map: [host-name] %s [tenant-id] %d [cluster] %s", + hostName, tenantId, cluster.getClusterId())); } } - }); - return messageProcessorChain; + } + else { + if(log.isErrorEnabled()) { + log.error(String.format("Could not find cluster of tenant: [service] %s [tenant-id] %d", + serviceName, tenantId)); + } + } + } + + private void removeTenantSubscriptionFromLbContext(String serviceName, int tenantId) { + // Find cluster of tenant + Cluster cluster = findCluster(serviceName, tenantId); + if (cluster != null) { + for (String hostName : cluster.getHostNames()) { + LoadBalancerContext.getInstance().removeMultiTenantClusters(hostName); + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster removed from multi-tenant clusters map: [host-name] %s [tenant-id] %d [cluster] %s", + hostName, tenantId, cluster.getClusterId())); + } + } + } else { + if (log.isErrorEnabled()) { + log.error(String.format("Could not find cluster of tenant: [service] %s [tenant-id] %d", + serviceName, tenantId)); + } + } } private Cluster findCluster(String serviceName, int tenantId) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java index 55ff817..bd5a0b1 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java @@ -22,10 +22,10 @@ package org.apache.stratos.load.balancer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.load.balancer.context.LoadBalancerContext; +import org.apache.stratos.load.balancer.context.LoadBalancerContextUtil; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.Service; -import org.apache.stratos.messaging.domain.topology.ServiceType; import org.apache.stratos.messaging.event.Event; import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; @@ -85,7 +85,7 @@ public class LoadBalancerTopologyReceiver implements Runnable { for (Service service : TopologyManager.getTopology().getServices()) { for (Cluster cluster : service.getClusters()) { if (hasActiveMembers(cluster)) { - addClusterToLbContext(cluster); + LoadBalancerContextUtil.addClusterToLbContext(cluster); } } } @@ -113,10 +113,9 @@ public class LoadBalancerTopologyReceiver implements Runnable { MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; Cluster cluster = LoadBalancerContext.getInstance().getCluster(memberActivatedEvent.getClusterId()); if (cluster != null) { - addClusterToLbContext(cluster); - } - else { - if(log.isWarnEnabled()) { + LoadBalancerContextUtil.addClusterToLbContext(cluster); + } else { + if (log.isWarnEnabled()) { log.warn(String.format("Cluster not found in cluster id cluster map: [cluster] %s", memberActivatedEvent.getClusterId())); } } @@ -136,11 +135,10 @@ public class LoadBalancerTopologyReceiver implements Runnable { Cluster cluster = LoadBalancerContext.getInstance().getCluster(clusterRemovedEvent.getClusterId()); if (cluster != null) { for (String hostName : cluster.getHostNames()) { - removeClusterFromLbContext(hostName); + LoadBalancerContextUtil.removeClusterFromLbContext(hostName); } - } - else { - if(log.isWarnEnabled()) { + } else { + if (log.isWarnEnabled()) { log.warn(String.format("Cluster not found in cluster id cluster map: [cluster] %s", clusterRemovedEvent.getClusterId())); } } @@ -161,12 +159,11 @@ public class LoadBalancerTopologyReceiver implements Runnable { if (service != null) { for (Cluster cluster : service.getClusters()) { for (String hostName : cluster.getHostNames()) { - removeClusterFromLbContext(hostName); + LoadBalancerContextUtil.removeClusterFromLbContext(hostName); } } - } - else { - if(log.isWarnEnabled()) { + } else { + if (log.isWarnEnabled()) { log.warn(String.format("Service not found in topology: [service] %s", serviceRemovedEvent.getServiceName())); } } @@ -178,45 +175,6 @@ public class LoadBalancerTopologyReceiver implements Runnable { return processorChain; } - private void addClusterToLbContext(Cluster cluster) { - // Add cluster to Map<ClusterId, Cluster> - LoadBalancerContext.getInstance().addCluster(cluster); - - Service service = TopologyManager.getTopology().getService(cluster.getServiceName()); - if (service.getServiceType() == ServiceType.SingleTenant) { - // Add cluster to SingleTenantClusterMap - for (String hostName : cluster.getHostNames()) { - if (!LoadBalancerContext.getInstance().singleTenantClusterExists((hostName))) { - LoadBalancerContext.getInstance().addSingleTenantCluster(hostName, cluster); - if (log.isDebugEnabled()) { - log.debug(String.format("Cluster added to single tenant cluster map: [cluster] %s [hostname] %s", cluster.getClusterId(), hostName)); - } - } - } - } - // MultiTenantClusterMap is updated by tenant receiver. - } - - private void removeClusterFromLbContext(String clusterId) { - Cluster cluster = LoadBalancerContext.getInstance().getCluster(clusterId); - Service service = TopologyManager.getTopology().getService(cluster.getServiceName()); - if (service.getServiceType() == ServiceType.SingleTenant) { - // Remove cluster from SingleTenantClusterMap - for (String hostName : cluster.getHostNames()) { - if (LoadBalancerContext.getInstance().singleTenantClusterExists(hostName)) { - LoadBalancerContext.getInstance().removeSingleTenantCluster(hostName); - if (log.isDebugEnabled()) { - log.debug(String.format("Cluster removed from single tenant cluster map: [cluster] %s [hostname] %s", cluster.getClusterId(), hostName)); - } - } - } - } - // MultiTenantClusterMap is updated by tenant receiver. - - // Remove cluster from Map<ClusterId,Cluster> - LoadBalancerContext.getInstance().removeCluster(clusterId); - } - /** * Terminate load balancer topology receiver thread. */ http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java index 9b0a8a5..be09b31 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.load.balancer.algorithm.AlgorithmContext; import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithm; +import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration; import org.apache.stratos.load.balancer.context.LoadBalancerContext; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Member; @@ -116,14 +117,13 @@ public class RequestDelegator { } public boolean isTargetHostValid(String hostName) { - try { - if (hostName == null) - return false; + if (hostName == null) + return false; - TopologyManager.acquireReadLock(); - return LoadBalancerContext.getInstance().clusterExists(hostName); - } finally { - TopologyManager.releaseReadLock(); + boolean valid = LoadBalancerContext.getInstance().singleTenantClusterExists(hostName); + if ((!valid) && (LoadBalancerConfiguration.getInstance().isMultiTenancyEnabled())) { + valid = LoadBalancerContext.getInstance().multiTenantClustersExists(hostName); } + return valid; } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java index c94d55f..ae811a0 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java @@ -28,6 +28,7 @@ import org.apache.stratos.load.balancer.conf.structure.Node; import org.apache.stratos.load.balancer.conf.structure.NodeBuilder; import org.apache.stratos.load.balancer.conf.util.Constants; import org.apache.stratos.load.balancer.context.LoadBalancerContext; +import org.apache.stratos.load.balancer.context.LoadBalancerContextUtil; import org.apache.stratos.load.balancer.exception.InvalidConfigurationException; import org.apache.stratos.messaging.domain.topology.*; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; @@ -238,7 +239,7 @@ public class LoadBalancerConfiguration { public LoadBalancerConfiguration readFromFile() { String configFilePath = System.getProperty("loadbalancer.conf.file"); - if(configFilePath == null){ + if (configFilePath == null) { throw new RuntimeException("loadbalancer.conf.file' system property is not set"); } @@ -302,8 +303,8 @@ public class LoadBalancerConfiguration { configuration.setMultiTenancyEnabled(Boolean.parseBoolean(multiTenancyEnabled)); } - // Read mb ip, port, topology service filter and topology cluster filter if topology event listener is enabled - if (configuration.isTopologyEventListenerEnabled()) { + // Read mb ip and port + if (configuration.isTopologyEventListenerEnabled() || configuration.isMultiTenancyEnabled()) { String mbIp = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_MB_IP); validateRequiredPropertyInNode(Constants.CONF_PROPERTY_MB_IP, mbIp, "loadbalancer"); configuration.setMbIp(mbIp); @@ -311,7 +312,10 @@ public class LoadBalancerConfiguration { String mbPort = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_MB_PORT); validateRequiredPropertyInNode(Constants.CONF_PROPERTY_MB_PORT, mbPort, "loadbalancer"); configuration.setMbPort(Integer.parseInt(mbPort)); + } + // Read topology service filter and topology cluster filter + if (configuration.isTopologyEventListenerEnabled()) { String serviceFilter = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_TOPOLOGY_SERVICE_FILTER); if (StringUtils.isNotBlank(serviceFilter)) { configuration.setTopologyServiceFilter(serviceFilter); @@ -349,8 +353,7 @@ public class LoadBalancerConfiguration { validateRequiredPropertyInNode(Constants.CONF_PROPERTY_TENANT_IDENTIFIER_REGEX, tenantIdentifierRegex, "loadbalancer"); try { Pattern.compile(tenantIdentifierRegex); - } - catch (Exception e) { + } catch (Exception e) { throw new InvalidConfigurationException(String.format("Invalid tenant identifier regular expression: %s", tenantIdentifierRegex), e); } configuration.setTenantIdentifierRegex(tenantIdentifierRegex); @@ -419,8 +422,8 @@ public class LoadBalancerConfiguration { } service.addCluster(cluster); - // Add cluster to load balancer context Map<Hostname,Cluster> - LoadBalancerContext.getInstance().addCluster(cluster); + // Add cluster to load balancer context + LoadBalancerContextUtil.addClusterToLbContext(cluster); } // Add service to topology manager http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContextUtil.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContextUtil.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContextUtil.java new file mode 100644 index 0000000..f644933 --- /dev/null +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContextUtil.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.load.balancer.context; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.domain.topology.ServiceType; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + +/** + * Load balancer context utility class. + */ +public class LoadBalancerContextUtil { + private static final Log log = LogFactory.getLog(LoadBalancerContextUtil.class); + + public static void addClusterToLbContext(Cluster cluster) { + // Add cluster to Map<ClusterId, Cluster> + LoadBalancerContext.getInstance().addCluster(cluster); + + Service service = TopologyManager.getTopology().getService(cluster.getServiceName()); + if (service.getServiceType() == ServiceType.SingleTenant) { + // Add cluster to SingleTenantClusterMap + for (String hostName : cluster.getHostNames()) { + if (!LoadBalancerContext.getInstance().singleTenantClusterExists((hostName))) { + LoadBalancerContext.getInstance().addSingleTenantCluster(hostName, cluster); + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster added to single tenant cluster map: [cluster] %s [hostname] %s", cluster.getClusterId(), hostName)); + } + } + } + } + // MultiTenantClusterMap is updated by tenant receiver. + } + + public static void removeClusterFromLbContext(String clusterId) { + Cluster cluster = LoadBalancerContext.getInstance().getCluster(clusterId); + Service service = TopologyManager.getTopology().getService(cluster.getServiceName()); + if (service.getServiceType() == ServiceType.SingleTenant) { + // Remove cluster from SingleTenantClusterMap + for (String hostName : cluster.getHostNames()) { + if (LoadBalancerContext.getInstance().singleTenantClusterExists(hostName)) { + LoadBalancerContext.getInstance().removeSingleTenantCluster(hostName); + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster removed from single tenant cluster map: [cluster] %s [hostname] %s", cluster.getClusterId(), hostName)); + } + } + } + } + // MultiTenantClusterMap is updated by tenant receiver. + + // Remove cluster from Map<ClusterId,Cluster> + LoadBalancerContext.getInstance().removeCluster(clusterId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf index 1a345a9..dac8f0b 100755 --- a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf +++ b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf @@ -48,8 +48,8 @@ loadbalancer { # Message broker endpoint # Provide message broker ip address and port if topology-event-listener or multi-tenancy is set to true. - # mb-ip: localhost; - # mb-port: 5677; + mb-ip: localhost; + mb-port: 5677; # Topology service filter # Provide service names in a comma separated list to filter incoming topology events if http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java index f4e3a0c..bc4244a 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java @@ -20,6 +20,7 @@ package org.apache.stratos.messaging.domain.tenant; import java.io.Serializable; +import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -52,6 +53,10 @@ public class Tenant implements Serializable{ this.tenantDomain = tenantDomain; } + public Collection<String> getServiceSubscriptions() { + return serviceNameMap.keySet(); + } + public boolean isServiceSubscribed(String serviceName) { return serviceNameMap.containsKey(serviceName); } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/products/load-balancer/modules/distribution/src/main/conf/log4j.properties ---------------------------------------------------------------------- diff --git a/products/load-balancer/modules/distribution/src/main/conf/log4j.properties b/products/load-balancer/modules/distribution/src/main/conf/log4j.properties index 1942895..3a23f46 100644 --- a/products/load-balancer/modules/distribution/src/main/conf/log4j.properties +++ b/products/load-balancer/modules/distribution/src/main/conf/log4j.properties @@ -111,7 +111,7 @@ log4j.appender.CARBON_LOGFILE.File=${carbon.home}/repository/logs/${instance.log log4j.appender.CARBON_LOGFILE.Append=true log4j.appender.CARBON_LOGFILE.layout=org.wso2.carbon.utils.logging.TenantAwarePatternLayout # ConversionPattern will be overridden by the configuration setting in the DB -log4j.appender.CARBON_LOGFILE.layout.ConversionPattern=TID: [%T] [%S] [%d] %P%5p {%c} - %x %m {%c}%n +log4j.appender.CARBON_LOGFILE.layout.ConversionPattern=TID: [%T] [%S] [%d] %P%5p {%c} - %x %m %n log4j.appender.CARBON_LOGFILE.layout.TenantPattern=%U%@%D [%T] [%S] log4j.appender.CARBON_LOGFILE.threshold=DEBUG
