Repository: incubator-stratos Updated Branches: refs/heads/master 172740958 -> 4097941c8
Fixing STRATOS-508. Removing subsription check when adding clusters to cluster map in information model. Modify LoadBalancerCategory to fix LB cluster-id issue Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/7b3a26d5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/7b3a26d5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/7b3a26d5 Branch: refs/heads/master Commit: 7b3a26d5a46ec0809c2854ac35b220e1747dda35 Parents: 9694fa2 Author: Sajith Kariyawasam <[email protected]> Authored: Wed Mar 12 17:22:03 2014 +0530 Committer: Sajith Kariyawasam <[email protected]> Committed: Wed Mar 12 17:22:03 2014 +0530 ---------------------------------------------------------------------- .../lb/category/LoadBalancerCategory.java | 86 +++++ .../manager/CartridgeSubscriptionManager.java | 14 +- .../RegistryBasedPersistenceManager.java | 5 +- .../utils/CartridgeSubscriptionUtils.java | 4 +- .../model/TopologyClusterInformationModel.java | 315 +++---------------- .../StratosManagerTopologyReceiver.java | 149 ++------- 6 files changed, 185 insertions(+), 388 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7b3a26d5/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lb/category/LoadBalancerCategory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lb/category/LoadBalancerCategory.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lb/category/LoadBalancerCategory.java index 906d2a9..3352935 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lb/category/LoadBalancerCategory.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lb/category/LoadBalancerCategory.java @@ -19,11 +19,28 @@ package org.apache.stratos.manager.lb.category; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.pojo.CartridgeInfo; import org.apache.stratos.manager.behaviour.CartridgeMgtBehaviour; +import org.apache.stratos.manager.dao.Cluster; +import org.apache.stratos.manager.deploy.service.Service; +import org.apache.stratos.manager.exception.ADCException; +import org.apache.stratos.manager.exception.AlreadySubscribedException; +import org.apache.stratos.manager.exception.PersistenceManagerException; +import org.apache.stratos.manager.payload.PayloadData; +import org.apache.stratos.manager.repository.Repository; +import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager; +import org.apache.stratos.manager.subscriber.Subscriber; +import org.apache.stratos.manager.subscription.utils.CartridgeSubscriptionUtils; public abstract class LoadBalancerCategory extends CartridgeMgtBehaviour { private String loadBalancedServiceType; + private boolean isLoadBalancedServiceMultiTenant; + private static Log log = LogFactory.getLog(LoadBalancerCategory.class); public String getLoadBalancedServiceType() { return loadBalancedServiceType; @@ -32,4 +49,73 @@ public abstract class LoadBalancerCategory extends CartridgeMgtBehaviour { public void setLoadBalancedServiceType(String loadBalancedServiceType) { this.loadBalancedServiceType = loadBalancedServiceType; } + + public PayloadData create(String alias, Cluster cluster, + Subscriber subscriber, Repository repository, + CartridgeInfo cartridgeInfo, String subscriptionKey, + Map<String, String> customPayloadEntries) throws ADCException, + AlreadySubscribedException { + + String clusterId; + + if (isLoadBalancedServiceMultiTenant) { + // the load balancer should be already up and running from service + // cluster deployment + + // get the cluster domain and host name from deployed Service + + Service deployedLBService; + try { + deployedLBService = new DataInsertionAndRetrievalManager() + .getService(cartridgeInfo.getType()); + + } catch (PersistenceManagerException e) { + String errorMsg = "Error in checking if Service is available is PersistenceManager"; + log.error(errorMsg, e); + throw new ADCException(errorMsg, e); + } + + if (deployedLBService == null) { + String errorMsg = "There is no deployed Service for type " + + cartridgeInfo.getType(); + log.error(errorMsg); + throw new ADCException(errorMsg); + } + + if(log.isDebugEnabled()){ + log.debug(" Setting cluster Domain : " + deployedLBService.getClusterId()); + log.debug(" Setting Host Name : " + deployedLBService.getHostName()); + } + + // set the cluster and hostname + cluster.setClusterDomain(deployedLBService.getClusterId()); + cluster.setHostName(deployedLBService.getHostName()); + + } else { + clusterId = alias + "." + cartridgeInfo.getType() + ".domain"; + + // limit the cartridge alias to 30 characters in length + if (clusterId.length() > 30) { + clusterId = CartridgeSubscriptionUtils.limitLengthOfString( + clusterId, 30); + } + cluster.setClusterDomain(clusterId); + // set hostname + cluster.setHostName(alias + "." + cluster.getHostName()); + } + + return createPayload(cartridgeInfo, subscriptionKey, subscriber, + cluster, repository, alias, customPayloadEntries); + } + + public boolean isLoadBalancedServiceMultiTenant() { + return isLoadBalancedServiceMultiTenant; + } + + public void setLoadBalancedServiceMultiTenant( + boolean isLoadBalancedServiceMultiTenant) { + this.isLoadBalancedServiceMultiTenant = isLoadBalancedServiceMultiTenant; + } + + } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7b3a26d5/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java index bfc62de..9688fff 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java @@ -115,7 +115,7 @@ public class CartridgeSubscriptionManager { subscriptionData.getDeploymentPolicyName(), lbConfig); // subscribe to LB - lbCartridgeSubscription = subscribeToLB (subscriptionData, lbDataCtxt); + lbCartridgeSubscription = subscribeToLB (subscriptionData, lbDataCtxt, cartridgeInfo); lbCartridgeSubscriptionProperties = new Properties(); if (lbDataCtxt.getLbProperperties() != null && !lbDataCtxt.getLbProperperties().isEmpty()) { @@ -149,12 +149,14 @@ public class CartridgeSubscriptionManager { return registerCartridgeSubscription(serviceCartridgeSubscription, serviceCartridgeSubscriptionProperties); } - private CartridgeSubscription subscribeToLB (SubscriptionData subscriptionData, LBDataContext lbDataContext) + private CartridgeSubscription subscribeToLB (SubscriptionData subscriptionData, LBDataContext lbDataContext, + CartridgeInfo serviceCartridgeInfo) throws ADCException, InvalidCartridgeAliasException, DuplicateCartridgeAliasException, PolicyException, UnregisteredCartridgeException, RepositoryRequiredException, RepositoryCredentialsRequiredException, RepositoryTransportException, AlreadySubscribedException, InvalidRepositoryException { + if (lbDataContext.getLbCategory() == null || lbDataContext.getLbCategory().equals(Constants.NO_LOAD_BALANCER)) { // no load balancer subscription required log.info("No LB subscription required for the Subscription with alias: " + subscriptionData.getCartridgeAlias() + ", type: " + @@ -185,6 +187,9 @@ public class CartridgeSubscriptionManager { } // Set the load balanced service type loadBalancerCategory.setLoadBalancedServiceType(subscriptionData.getCartridgeType()); + + // Set if the load balanced service is multi tenant or not + loadBalancerCategory.setLoadBalancedServiceMultiTenant(serviceCartridgeInfo.getMultiTenant()); // Create the CartridgeSubscription instance CartridgeSubscription cartridgeSubscription = CartridgeSubscriptionFactory.getLBCartridgeSubscriptionInstance(lbDataContext, loadBalancerCategory); @@ -323,8 +328,9 @@ public class CartridgeSubscriptionManager { cartridgeSubscription.removeSubscription(); // Remove the information from Topology Model - TopologyClusterInformationModel.getInstance().removeCluster(cartridgeSubscription.getSubscriber().getTenantId(), - cartridgeSubscription.getType(), cartridgeSubscription.getAlias()); + // Not needed now. TopologyModel is now changed so that information is taken from subscriptions + //TopologyClusterInformationModel.getInstance().removeCluster(cartridgeSubscription.getSubscriber().getTenantId(), + // cartridgeSubscription.getType(), cartridgeSubscription.getAlias()); // remove subscription try { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7b3a26d5/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/persistence/RegistryBasedPersistenceManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/persistence/RegistryBasedPersistenceManager.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/persistence/RegistryBasedPersistenceManager.java index 0070e82..6ecee93 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/persistence/RegistryBasedPersistenceManager.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/persistence/RegistryBasedPersistenceManager.java @@ -174,7 +174,10 @@ public class RegistryBasedPersistenceManager extends PersistenceManager { log.debug("Traversing resource path " + subscriptionResourcePath); } - cartridgeSubscriptions.addAll(traverseAndGetCartridgeSubscriptions(subscriptionResourcePath)); + Collection<CartridgeSubscription> cartridgeSubscriptionSet = traverseAndGetCartridgeSubscriptions(subscriptionResourcePath); + if (cartridgeSubscriptionSet != null) { + cartridgeSubscriptions.addAll(cartridgeSubscriptionSet); + } } } else { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7b3a26d5/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java index e8b572c..9ee5d71 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java @@ -81,9 +81,9 @@ public class CartridgeSubscriptionUtils { } //TODO:remove. we do not want to know about the tenant rance in subscription! - if(cartridgeInfo.getMultiTenant() || subscriber.getTenantId() == -1234) { //TODO: fix properly + if(cartridgeInfo.getMultiTenant()) { //TODO: fix properly basicPayloadData.setTenantRange("*"); - } else { + } else if (subscriber != null) { basicPayloadData.setTenantRange(String.valueOf(subscriber.getTenantId())); } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7b3a26d5/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/model/TopologyClusterInformationModel.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/model/TopologyClusterInformationModel.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/model/TopologyClusterInformationModel.java index a6ac140..67b8592 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/model/TopologyClusterInformationModel.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/model/TopologyClusterInformationModel.java @@ -19,13 +19,19 @@ package org.apache.stratos.manager.topology.model; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager; +import org.apache.stratos.manager.subscription.CartridgeSubscription; import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.Member; - -import java.util.*; -import java.util.concurrent.locks.ReentrantReadWriteLock; public class TopologyClusterInformationModel { @@ -33,14 +39,15 @@ public class TopologyClusterInformationModel { private Map<Integer, Set<CartridgeTypeContext>> tenantIdToCartridgeTypeContextMap; private static TopologyClusterInformationModel topologyClusterInformationModel; + private Map<String, Cluster> clusterIdToClusterMap; //locks private static volatile ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); - private static volatile ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); private static volatile ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); private TopologyClusterInformationModel() { tenantIdToCartridgeTypeContextMap = new HashMap<Integer, Set<CartridgeTypeContext>>(); + clusterIdToClusterMap = new HashMap<String, Cluster>(); } public static TopologyClusterInformationModel getInstance () { @@ -54,269 +61,44 @@ public class TopologyClusterInformationModel { return topologyClusterInformationModel; } - - public void addCluster (int tenantId, String cartridgeType, String subscriptionAlias, Cluster cluster) { - - Set<CartridgeTypeContext> cartridgeTypeContextSet = null; - Set<SubscriptionAliasContext> subscriptionAliasContextSet = null; - - writeLock.lock(); - try { - //check if a set of CartridgeTypeContext instances already exist for given tenant Id - cartridgeTypeContextSet = tenantIdToCartridgeTypeContextMap.get(tenantId); - if(cartridgeTypeContextSet != null) { - CartridgeTypeContext cartridgeTypeContext = null; - //iterate through the set - Iterator<CartridgeTypeContext> typeCtxIterator = cartridgeTypeContextSet.iterator(); - while (typeCtxIterator.hasNext()) { - //see if the set contains a CartridgeTypeContext instance with the given cartridge type - cartridgeTypeContext = typeCtxIterator.next(); - if (cartridgeTypeContext.getType().equals(cartridgeType)){ - //if so, get the SubscriptionAliasContext set - subscriptionAliasContextSet = cartridgeTypeContext.getSubscriptionAliasContextSet(); - break; - } - } - //check if a SubscriptionAliasContext set is not found - if(subscriptionAliasContextSet == null) { - //no SubscriptionAliasContext instance - //create a new SubscriptionAliasContext instance - SubscriptionAliasContext subscriptionAliasContext = new SubscriptionAliasContext(subscriptionAlias, - cluster); - //create a SubscriptionAliasContext set - subscriptionAliasContextSet = new HashSet<SubscriptionAliasContext>(); - //add the created SubscriptionAliasContext instance to SubscriptionAliasContext set - subscriptionAliasContextSet.add(subscriptionAliasContext); - //set it to the CartridgeTypeContext instance - cartridgeTypeContext = new CartridgeTypeContext(cartridgeType); - cartridgeTypeContext.setSubscriptionAliasContextSet(subscriptionAliasContextSet); - //add to the cartridgeTypeContextSet - cartridgeTypeContextSet.add(cartridgeTypeContext); - - if (log.isDebugEnabled()) { - log.debug("New cluster added : " + cluster.toString()); - Collection<Member> members = cluster.getMembers(); - if (members != null && !members.isEmpty()) { - for (Member member : members) { - log.debug("[ " + member.getServiceName() + ", " + member.getClusterId() + ", "+ member.getMemberId() + " ]"); - } - } - } - - } else { - //iterate through the set - /*Iterator<SubscriptionAliasContext> aliasIterator = subscriptionAliasContextSet.iterator(); - while (aliasIterator.hasNext()) { - //see if the set contains a SubscriptionAliasContext instance with the given alias - SubscriptionAliasContext subscriptionAliasContext = aliasIterator.next(); - if (subscriptionAliasContext.getSubscriptionAlias().equals(subscriptionAlias)) { - //remove the existing one - aliasIterator.remove(); - break; - } - }*/ - // remove the existing one - boolean existingClusterRemoved = subscriptionAliasContextSet.remove(new SubscriptionAliasContext(subscriptionAlias, null)); - - //now, add the new cluster object - subscriptionAliasContextSet.add(new SubscriptionAliasContext(subscriptionAlias, cluster)); - - if (log.isDebugEnabled()) { - // check if cluster was overwritten - if (existingClusterRemoved) { - log.debug("Existing cluster found, updated : " + cluster.toString()); - Collection<Member> members = cluster.getMembers(); - if (members != null && !members.isEmpty()) { - for (Member member : members) { - log.debug("[ " + member.getServiceName() + ", " + member.getClusterId() + ", "+ member.getMemberId() + " ]"); - } - } - } else { - log.debug("New cluster added : " + cluster.toString()); - Collection<Member> members = cluster.getMembers(); - if (members != null && !members.isEmpty()) { - for (Member member : members) { - log.debug("[ " + member.getServiceName() + ", " + member.getClusterId() + ", "+ member.getMemberId() + " ]"); - } - } - } - } - } - - } else { - //no entries for this tenant, go from down to top creating relevant objects and populating them - //create a new SubscriptionAliasContext instance - SubscriptionAliasContext subscriptionAliasContext = new SubscriptionAliasContext(subscriptionAlias, - cluster); - //create a SubscriptionAliasContext set - subscriptionAliasContextSet = new HashSet<SubscriptionAliasContext>(); - //add the created SubscriptionAliasContext instance to SubscriptionAliasContext set - subscriptionAliasContextSet.add(subscriptionAliasContext); - - //create a new CartridgeTypeContext instance - CartridgeTypeContext cartridgeTypeContext = new CartridgeTypeContext(cartridgeType); - //link the SubscriptionAliasContextSet to it - cartridgeTypeContext.setSubscriptionAliasContextSet(subscriptionAliasContextSet); - - //Create CartridgeTypeContext instance - cartridgeTypeContextSet = new HashSet<CartridgeTypeContext>(); - //link the SubscriptionAliasContext set to CartridgeTypeContext instance - //////////////cartridgeTypeContext.setSubscriptionAliasContextSet(subscriptionAliasContextSet); - cartridgeTypeContextSet.add(cartridgeTypeContext); - - //link the CartridgeTypeContext set to the [tenant Id -> CartridgeTypeContext] map - tenantIdToCartridgeTypeContextMap.put(tenantId, cartridgeTypeContextSet); - - if (log.isDebugEnabled()) { - log.debug("New cluster added : " + cluster.toString()); - Collection<Member> members = cluster.getMembers(); - if (members != null && !members.isEmpty()) { - for (Member member : members) { - log.debug("[ " + member.getServiceName() + ", " + member.getClusterId() + ", "+ member.getMemberId() + " ]"); - } - } - } - } - - } finally { - writeLock.unlock(); - } - } + + public void addCluster (Cluster cluster) { + if(log.isDebugEnabled()) { + log.debug(" Adding cluster ["+cluster.getClusterId()+"] "); + } + clusterIdToClusterMap.put(cluster.getClusterId(), cluster); + } public Cluster getCluster (int tenantId, String cartridgeType, String subscriptionAlias) { - - Set<CartridgeTypeContext> cartridgeTypeContextSet = null; - Set<SubscriptionAliasContext> subscriptionAliasContextSet = null; - - readLock.lock(); - try { - //check if a set of CartridgeTypeContext instances already exist for given tenant Id - cartridgeTypeContextSet = tenantIdToCartridgeTypeContextMap.get(tenantId); - if(cartridgeTypeContextSet != null) { - CartridgeTypeContext cartridgeTypeContext = null; - //iterate through the set - Iterator<CartridgeTypeContext> typeCtxIterator = cartridgeTypeContextSet.iterator(); - while (typeCtxIterator.hasNext()) { - //see if the set contains a CartridgeTypeContext instance with the given cartridge type - cartridgeTypeContext = typeCtxIterator.next(); - if (cartridgeTypeContext.getType().equals(cartridgeType)){ - //if so, get the SubscriptionAliasContext set - subscriptionAliasContextSet = cartridgeTypeContext.getSubscriptionAliasContextSet(); - break; - } - } - if(subscriptionAliasContextSet != null) { - //iterate through the set - Iterator<SubscriptionAliasContext> aliasIterator = subscriptionAliasContextSet.iterator(); - while (aliasIterator.hasNext()) { - //see if the set contains a SubscriptionAliasContext instance with the given alias - SubscriptionAliasContext subscriptionAliasContext = aliasIterator.next(); - if (subscriptionAliasContext.equals(new SubscriptionAliasContext(subscriptionAlias, null))) { - - if (log.isDebugEnabled()) { - log.debug("Matching cluster found for tenant " + tenantId + ", type " + cartridgeType + - ", subscription alias " + subscriptionAlias + ": " + subscriptionAliasContext.getCluster().toString()); - Collection<Member> members = subscriptionAliasContext.getCluster().getMembers(); - if (members != null && !members.isEmpty()) { - for (Member member : members) { - log.debug("[ " + member.getServiceName() + ", " + member.getClusterId() + ", "+ member.getMemberId() + " ]"); - } - } - } - - return subscriptionAliasContext.getCluster(); - } - } - } - } - - } finally { - readLock.unlock(); - } - - return null; + + DataInsertionAndRetrievalManager dx = new DataInsertionAndRetrievalManager(); + String clusterId = dx.getCartridgeSubscription(tenantId, subscriptionAlias).getClusterDomain(); + Cluster cluster = clusterIdToClusterMap.get(clusterId); + if(log.isDebugEnabled()) { + log.debug(" Found cluster ["+cluster+"] with id ["+clusterId+"] "); + } + return cluster; } - + public Set<Cluster> getClusters (int tenantId, String cartridgeType) { - - Set<CartridgeTypeContext> cartridgeTypeContextSet = null; - Set<SubscriptionAliasContext> subscriptionAliasContextSet = null; - Set<Cluster> clusterSet = new HashSet<Cluster>(); - - readLock.lock(); - try { - cartridgeTypeContextSet = tenantIdToCartridgeTypeContextMap.get(tenantId); - if(cartridgeTypeContextSet != null) { - //iterate through the set - Iterator<CartridgeTypeContext> typeCtxIterator = cartridgeTypeContextSet.iterator(); - while (typeCtxIterator.hasNext()) { - //iterate and get each of SubscriptionAliasContext sets - CartridgeTypeContext cartridgeTypeContext = typeCtxIterator.next(); - - if (cartridgeType != null) { - // check if CartridgeTypeContext instance matches the cartridgeType - if (cartridgeTypeContext.equals(new CartridgeTypeContext(cartridgeType))) { - - subscriptionAliasContextSet = cartridgeTypeContext.getSubscriptionAliasContextSet(); - - if (subscriptionAliasContextSet != null) { - //iterate and convert to Cluster set - Iterator<SubscriptionAliasContext> aliasCtxIterator = subscriptionAliasContextSet.iterator(); - - while (aliasCtxIterator.hasNext()) { - Cluster cluster = aliasCtxIterator.next().getCluster(); - // add the cluster to the set - clusterSet.add(cluster); - - if (log.isDebugEnabled()) { - log.debug("Matching cluster found for tenant " + tenantId + " : " + cluster.toString()); - Collection<Member> members = cluster.getMembers(); - if (members != null && !members.isEmpty()) { - for (Member member : members) { - log.debug("[ " + member.getServiceName() + ", " + member.getClusterId() + ", "+ member.getMemberId() + " ]"); - } - } - } - } - } - } - - } else { - // no cartridgeType specified - subscriptionAliasContextSet = cartridgeTypeContext.getSubscriptionAliasContextSet(); - - if (subscriptionAliasContextSet != null) { - //iterate and convert to Cluster set - Iterator<SubscriptionAliasContext> aliasCtxIterator = subscriptionAliasContextSet.iterator(); - - //clusterSet = new HashSet<Cluster>(); - while (aliasCtxIterator.hasNext()) { - Cluster cluster = aliasCtxIterator.next().getCluster(); - // add the cluster to the set - clusterSet.add(cluster); - - if (log.isDebugEnabled()) { - log.debug("Matching cluster found for tenant " + tenantId + ", type " + cartridgeType + " : " + cluster.toString()); - Collection<Member> members = cluster.getMembers(); - if (members != null && !members.isEmpty()) { - for (Member member : members) { - log.debug("[ " + member.getServiceName() + ", " + member.getClusterId() + ", "+ member.getMemberId() + " ]"); - } - } - } - } - } - } - } - } - - } finally { - readLock.unlock(); - } - - return clusterSet; + Set<Cluster> clusterSet = new HashSet<Cluster>(); + DataInsertionAndRetrievalManager dx = new DataInsertionAndRetrievalManager(); + Collection<CartridgeSubscription> subscriptions = null; + if(cartridgeType != null) { + subscriptions = dx.getCartridgeSubscriptions(tenantId, cartridgeType); + }else { + subscriptions = dx.getCartridgeSubscriptions(tenantId); + } + + if (subscriptions != null) { + for (CartridgeSubscription cartridgeSubscription : subscriptions) { + String clusterId = cartridgeSubscription.getClusterDomain(); + clusterSet.add(clusterIdToClusterMap.get(clusterId)); + } + } + return clusterSet; } - + public void removeCluster (int tenantId, String cartridgeType, String subscriptionAlias) { Set<CartridgeTypeContext> cartridgeTypeContextSet = null; @@ -364,6 +146,13 @@ public class TopologyClusterInformationModel { writeLock.unlock(); } } + + public void removeCluster (String clusterId) { + if(log.isDebugEnabled()) { + log.debug(" Removing cluster ["+clusterId+"] "); + } + clusterIdToClusterMap.remove(clusterId); + } private class CartridgeTypeContext { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7b3a26d5/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java index 75e4b67..75f9752 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java @@ -21,22 +21,31 @@ package org.apache.stratos.manager.topology.receiver; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager; -import org.apache.stratos.manager.subscription.CartridgeSubscription; import org.apache.stratos.manager.topology.model.TopologyClusterInformationModel; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.event.Event; -import org.apache.stratos.messaging.event.topology.*; -import org.apache.stratos.messaging.listener.topology.*; +import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent; +import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; +import org.apache.stratos.messaging.event.topology.InstanceSpawnedEvent; +import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; +import org.apache.stratos.messaging.event.topology.MemberStartedEvent; +import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent; +import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; +import org.apache.stratos.messaging.listener.topology.ClusterCreatedEventListener; +import org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener; +import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener; +import org.apache.stratos.messaging.listener.topology.InstanceSpawnedEventListener; +import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener; +import org.apache.stratos.messaging.listener.topology.MemberStartedEventListener; +import org.apache.stratos.messaging.listener.topology.MemberSuspendedEventListener; +import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener; import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain; import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.message.receiver.topology.TopologyReceiver; -import java.util.Set; - public class StratosManagerTopologyReceiver implements Runnable { private static final Log log = LogFactory.getLog(StratosManagerTopologyReceiver.class); @@ -71,19 +80,10 @@ public class StratosManagerTopologyReceiver implements Runnable { for (Service service : TopologyManager.getTopology().getServices()) { //iterate through all clusters for (Cluster cluster : service.getClusters()) { - //get subscription details - Set<CartridgeSubscription> cartridgeSubscriptions = getCartridgeSubscription(cluster.getClusterId()); - - if(cartridgeSubscriptions != null) { - // iterate and do the relevant changes - for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { - //add the information to Topology Cluster Info. model - TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), - cartridgeSubscription.getType(), cartridgeSubscription.getAlias(), cluster); - } + TopologyClusterInformationModel.getInstance().addCluster(cluster); } } - } + } finally { TopologyManager.releaseReadLock(); } @@ -98,10 +98,6 @@ public class StratosManagerTopologyReceiver implements Runnable { log.info("********** [ClusterCreatedEventListener] Received: " + event.getClass() + " **********"); ClusterCreatedEvent clustercreatedEvent = (ClusterCreatedEvent) event; - //get subscription details - Set<CartridgeSubscription> cartridgeSubscriptions = getCartridgeSubscription(clustercreatedEvent.getClusterId()); - - if(cartridgeSubscriptions != null) { String serviceType = clustercreatedEvent.getServiceName(); //acquire read lock @@ -109,48 +105,28 @@ public class StratosManagerTopologyReceiver implements Runnable { try { Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clustercreatedEvent.getClusterId()); - - // iterate and do the relevant changes - for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { - - //add the information to Topology Cluster Info. model - TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), - cartridgeSubscription.getType(), cartridgeSubscription.getAlias(), cluster); - } + TopologyClusterInformationModel.getInstance().addCluster(cluster); } finally { //release read lock TopologyManager.releaseReadLock(); } - } + } }); - // Removal of cluster is done in the unsubscription, therefore commenting this listener. //Cluster Removed event listner - /*processorChain.addEventListener(new ClusterRemovedEventListener() { + processorChain.addEventListener(new ClusterRemovedEventListener() { @Override protected void onEvent(Event event) { log.info("********** [ClusterRemovedEventListener] Received: " + event.getClass() + " **********"); ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event; - - Set<CartridgeSubscription> cartridgeSubscriptions = - getCartridgeSubscription(clusterRemovedEvent.getClusterId()); - - if(cartridgeSubscriptions != null) { - - // iterate - for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { - //add the information to Topology Cluster Info. model - TopologyClusterInformationModel.getInstance().removeCluster(cartridgeSubscription.getSubscriber().getTenantId(), - cartridgeSubscription.getType(), cartridgeSubscription.getAlias()); - } - } + TopologyClusterInformationModel.getInstance().removeCluster(clusterRemovedEvent.getClusterId()); } - });*/ + }); //Instance Spawned event listner @@ -164,29 +140,18 @@ public class StratosManagerTopologyReceiver implements Runnable { InstanceSpawnedEvent instanceSpawnedEvent = (InstanceSpawnedEvent) event; String clusterDomain = instanceSpawnedEvent.getClusterId(); - Set<CartridgeSubscription> cartridgeSubscriptions = getCartridgeSubscription(clusterDomain); - - if(cartridgeSubscriptions != null) { - + String serviceType = instanceSpawnedEvent.getServiceName(); //acquire read lock TopologyManager.acquireReadLock(); try { Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain); - - for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { - - TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), - cartridgeSubscription.getType(), cartridgeSubscription.getAlias(), cluster); - } - + TopologyClusterInformationModel.getInstance().addCluster(cluster); } finally { //release read lock TopologyManager.releaseReadLock(); - } - } - + } } }); @@ -200,28 +165,18 @@ public class StratosManagerTopologyReceiver implements Runnable { MemberStartedEvent memberStartedEvent = (MemberStartedEvent) event; String clusterDomain = memberStartedEvent.getClusterId(); - Set<CartridgeSubscription> cartridgeSubscriptions = getCartridgeSubscription(clusterDomain); - - if(cartridgeSubscriptions != null) { - + String serviceType = memberStartedEvent.getServiceName(); //acquire read lock TopologyManager.acquireReadLock(); try { Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain); - - for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { - - TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), - cartridgeSubscription.getType(), cartridgeSubscription.getAlias(), cluster); - } - + TopologyClusterInformationModel.getInstance().addCluster(cluster); } finally { //release read lock TopologyManager.releaseReadLock(); } - } } }); @@ -236,9 +191,6 @@ public class StratosManagerTopologyReceiver implements Runnable { MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; String clusterDomain = memberActivatedEvent.getClusterId(); - Set<CartridgeSubscription> cartridgeSubscriptions = getCartridgeSubscription(clusterDomain); - - if(cartridgeSubscriptions != null) { String serviceType = memberActivatedEvent.getServiceName(); //acquire read lock @@ -246,19 +198,11 @@ public class StratosManagerTopologyReceiver implements Runnable { try { Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain); - - for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { - - TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), - cartridgeSubscription.getType(), cartridgeSubscription.getAlias(), cluster); - } - + TopologyClusterInformationModel.getInstance().addCluster(cluster); } finally { //release read lock TopologyManager.releaseReadLock(); - } - } - + } } }); @@ -272,9 +216,6 @@ public class StratosManagerTopologyReceiver implements Runnable { MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event; String clusterDomain = memberSuspendedEvent.getClusterId(); - Set<CartridgeSubscription> cartridgeSubscriptions = getCartridgeSubscription(clusterDomain); - - if(cartridgeSubscriptions != null) { String serviceType = memberSuspendedEvent.getServiceName(); //acquire read lock @@ -282,19 +223,12 @@ public class StratosManagerTopologyReceiver implements Runnable { try { Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain); - - for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { - - TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), - cartridgeSubscription.getType(), cartridgeSubscription.getAlias(), cluster); - } + TopologyClusterInformationModel.getInstance().addCluster(cluster); } finally { //release read lock TopologyManager.releaseReadLock(); } - } - } }); @@ -308,9 +242,6 @@ public class StratosManagerTopologyReceiver implements Runnable { MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; String clusterDomain = memberTerminatedEvent.getClusterId(); - Set<CartridgeSubscription> cartridgeSubscriptions = getCartridgeSubscription(clusterDomain); - - if(cartridgeSubscriptions != null) { String serviceType = memberTerminatedEvent.getServiceName(); //acquire read lock @@ -345,35 +276,17 @@ public class StratosManagerTopologyReceiver implements Runnable { TopologyManager.releaseWriteLock(); } } - - for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { - - TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), - cartridgeSubscription.getType(), cartridgeSubscription.getAlias(), cluster); - } - + TopologyClusterInformationModel.getInstance().addCluster(cluster); } finally { //release read lock TopologyManager.releaseReadLock(); } - } - } }); return processorChain; } - private Set<CartridgeSubscription> getCartridgeSubscription(String clusterDomain) { - - try { - return new DataInsertionAndRetrievalManager().getCartridgeSubscriptionForCluster(clusterDomain); - - } catch (Exception e) { - log.error("Error getting subscription information for cluster " + clusterDomain, e); - return null; - } - } @Override public void run() {
