Updated Branches: refs/heads/master 93fbed761 -> ba477892b
adding logic to handle multiple Subscriptions for a cluster Id Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/f0e04b57 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/f0e04b57 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/f0e04b57 Branch: refs/heads/master Commit: f0e04b57e3a1ff7dca39bfeb3189affa321458c8 Parents: f6e1352 Author: Isuru <[email protected]> Authored: Wed Jan 8 17:48:54 2014 +0530 Committer: Isuru <[email protected]> Committed: Wed Jan 8 17:48:54 2014 +0530 ---------------------------------------------------------------------- .../listener/InstanceStatusListener.java | 27 +++-- .../manager/lookup/ClusterIdToSubscription.java | 61 ++++++++-- .../manager/lookup/LookupDataHolder.java | 7 +- .../manager/lookup/SubscriptionContext.java | 2 +- .../manager/CartridgeSubscriptionManager.java | 23 ++-- .../DataInsertionAndRetrievalManager.java | 7 +- .../SubscriptionMultiTenantBehaviour.java | 36 +++--- .../SubscriptionSingleTenantBehaviour.java | 46 +++++++ .../StratosManagerTopologyReceiver.java | 119 ++++++++++++------- 9 files changed, 229 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f0e04b57/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java index 69ff2cf..78338c1 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java @@ -30,6 +30,7 @@ import org.apache.stratos.messaging.util.Util; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; +import java.util.Set; public class InstanceStatusListener implements MessageListener { @@ -70,15 +71,23 @@ public class InstanceStatusListener implements MessageListener { ". Not sending the Depsync event"); }*/ //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - CartridgeSubscription cartridgeSubscription = new DataInsertionAndRetrievalManager().getCartridgeSubscription(clusterId); - if (cartridgeSubscription.getRepository() != null) { - ArtifactUpdatePublisher publisher = new ArtifactUpdatePublisher(cartridgeSubscription.getRepository(), clusterId, - String.valueOf(cartridgeSubscription.getSubscriber().getTenantId())); - publisher.publish(); - } else { - //TODO: make this log debug - log.info("No repository found for subscription with alias: " + cartridgeSubscription.getAlias() + ", type: " + cartridgeSubscription.getType()+ - ". Not sending the Depsync event"); + Set<CartridgeSubscription> cartridgeSubscriptions = new DataInsertionAndRetrievalManager().getCartridgeSubscription(clusterId); + + for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { + // If only this is a non-multitenant Cartridge Subscription and repository is not null, need to + // send an ArtifactUpdatedEvent event. If this is a multitenant cartridge, sending this event + // will be done in SubscriptionMultiTenantBehaviour#createSubscription method + if (!cartridgeSubscription.getCartridgeInfo().getMultiTenant() && cartridgeSubscription.getRepository() != null) { + ArtifactUpdatePublisher publisher = new ArtifactUpdatePublisher(cartridgeSubscription.getRepository(), clusterId, + String.valueOf(cartridgeSubscription.getSubscriber().getTenantId())); + publisher.publish(); + + } else { + if(log.isDebugEnabled()) { + log.debug("No repository found for subscription with alias: " + cartridgeSubscription.getAlias() + ", type: " + cartridgeSubscription.getType()+ + ". Not sending the Artifact Updated event"); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f0e04b57/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lookup/ClusterIdToSubscription.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lookup/ClusterIdToSubscription.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lookup/ClusterIdToSubscription.java index 6b85c68..d9f0cf2 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lookup/ClusterIdToSubscription.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lookup/ClusterIdToSubscription.java @@ -24,36 +24,81 @@ import org.apache.commons.logging.LogFactory; import org.apache.stratos.manager.subscription.CartridgeSubscription; import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; +import java.util.*; public class ClusterIdToSubscription implements Serializable { private static final Log log = LogFactory.getLog(ClusterIdToSubscription.class); - // Map: Cluster Id (Domain) -> CartridgeSubscription - private Map<String, CartridgeSubscription> clusterIdToCartridgeSubscription; + // Map: Cluster Id (Domain) -> Set<CartridgeSubscription> + private Map<String, Set<CartridgeSubscription>> clusterIdToCartridgeSubscription; public ClusterIdToSubscription() { - clusterIdToCartridgeSubscription = new HashMap<String, CartridgeSubscription>(); + clusterIdToCartridgeSubscription = new HashMap<String, Set<CartridgeSubscription>>(); } public void addSubscription (CartridgeSubscription cartridgeSubscription) { - clusterIdToCartridgeSubscription.put(cartridgeSubscription.getClusterDomain(), cartridgeSubscription); + //clusterIdToCartridgeSubscription.put(cartridgeSubscription.getClusterDomain(), cartridgeSubscription); + String clusterDomain = cartridgeSubscription.getClusterDomain(); + if (clusterIdToCartridgeSubscription.containsKey(clusterDomain)) { + Set<CartridgeSubscription> existingSubscriptions = clusterIdToCartridgeSubscription.get(clusterDomain); + // if an existing subscription is present, remove it + if(existingSubscriptions.remove(cartridgeSubscription)){ + if(log.isDebugEnabled()) { + log.debug("Removed the existing Cartridge Subscription for cluster id " + clusterDomain + " in [Cluster Id -> Set<CartridgeSubscription>] map"); + } + } + // add or update + existingSubscriptions.add(cartridgeSubscription); + if(log.isDebugEnabled()) { + log.debug("Added Cartridge Subscription for cluster id " + clusterDomain + " in [Cluster Id -> Set<CartridgeSubscription>] map"); + } + + } else { + // create a new set and add it + Set<CartridgeSubscription> subscriptions = new HashSet<CartridgeSubscription>(); + subscriptions.add(cartridgeSubscription); + clusterIdToCartridgeSubscription.put(clusterDomain, subscriptions); + } } - public CartridgeSubscription getSubscription (String clusterId) { + public Set<CartridgeSubscription> getSubscription (String clusterId) { return clusterIdToCartridgeSubscription.get(clusterId); } public void removeSubscription (String clusterId) { - if (clusterIdToCartridgeSubscription.remove(clusterId) != null) { + /*if (clusterIdToCartridgeSubscription.remove(clusterId) != null) { if (log.isDebugEnabled()) { log.debug("Deleted the subscription for cluster " + clusterId + " from [Cluster Id -> CartridgeSubscription] map"); } + }*/ + // remove Subscription from clusterIdToCartridgeSubscription map + Set<CartridgeSubscription> existingSubscriptions = clusterIdToCartridgeSubscription.get(clusterId); + if (existingSubscriptions != null && !existingSubscriptions.isEmpty()) { + // iterate through the set + Iterator<CartridgeSubscription> iterator = existingSubscriptions.iterator(); + while (iterator.hasNext()) { + CartridgeSubscription cartridgeSubscription = iterator.next(); + // if a matching CartridgeSubscription is found, remove + if (cartridgeSubscription.getClusterDomain().equals(clusterId)) { + iterator.remove(); + if (log.isDebugEnabled()) { + log.debug("Deleted the subscription for cluster id " + clusterId + " from [Cluster Id -> Set<CartridgeSubscription>] map"); + } + break; + } + } + } + + // if the Subscriptions set is empty now, remove it from cartridgeTypeToSubscriptions map + if (existingSubscriptions != null && existingSubscriptions.isEmpty()) { + clusterIdToCartridgeSubscription.remove(clusterId); + if (log.isDebugEnabled()) { + log.debug("Deleted the subscriptions set for cluster id " + clusterId + " from [Cluster Id -> Set<CartridgeSubscription>] map"); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f0e04b57/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lookup/LookupDataHolder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lookup/LookupDataHolder.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lookup/LookupDataHolder.java index 3dffbe6..64717b6 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lookup/LookupDataHolder.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lookup/LookupDataHolder.java @@ -25,6 +25,7 @@ import org.apache.stratos.manager.subscription.CartridgeSubscription; import java.io.Serializable; import java.util.Collection; +import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; public class LookupDataHolder implements Serializable { @@ -59,12 +60,12 @@ public class LookupDataHolder implements Serializable { public void putSubscription (CartridgeSubscription cartridgeSubscription) { - if (clusterIdToSubscription.getSubscription(cartridgeSubscription.getClusterDomain()) != null) { + /*if (clusterIdToSubscription.getSubscription(cartridgeSubscription.getClusterDomain()) != null) { if(log.isDebugEnabled()) { log.debug("Overwriting the existing CartridgeSubscription for cluster " + cartridgeSubscription.getClusterDomain() + " in [Cluster Id -> CartridgeSubscription] map"); } - } + }*/ // add or update clusterIdToSubscription.addSubscription(cartridgeSubscription); @@ -123,7 +124,7 @@ public class LookupDataHolder implements Serializable { } - public CartridgeSubscription getSubscription (String clusterId) { + public Set<CartridgeSubscription> getSubscription (String clusterId) { return clusterIdToSubscription.getSubscription(clusterId); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f0e04b57/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lookup/SubscriptionContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lookup/SubscriptionContext.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lookup/SubscriptionContext.java index 77f8872..8b59af7 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lookup/SubscriptionContext.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lookup/SubscriptionContext.java @@ -113,7 +113,7 @@ public class SubscriptionContext implements Serializable { } // if the Subscriptions set is empty now, remove it from cartridgeTypeToSubscriptions map - if (existingSubscriptions.isEmpty()) { + if (existingSubscriptions != null && existingSubscriptions.isEmpty()) { cartridgeTypeToSubscriptions.remove(type); if (log.isDebugEnabled()) { log.debug("Deleted the subscriptions set for type " + type + " from [Type -> Set<CartridgeSubscription>] map"); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f0e04b57/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java index bf436e5..a8aa6fc 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java @@ -22,14 +22,12 @@ package org.apache.stratos.manager.manager; import org.apache.axis2.AxisFault; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.pojo.CartridgeInfo; +import org.apache.stratos.cloud.controller.pojo.Property; import org.apache.stratos.manager.client.CloudControllerServiceClient; import org.apache.stratos.manager.dao.CartridgeSubscriptionInfo; import org.apache.stratos.manager.dto.SubscriptionInfo; import org.apache.stratos.manager.exception.*; -import org.apache.stratos.manager.payload.BasicPayloadData; -import org.apache.stratos.manager.payload.PayloadData; -import org.apache.stratos.manager.payload.PayloadFactory; -import org.apache.stratos.manager.publisher.ArtifactUpdatePublisher; import org.apache.stratos.manager.repository.Repository; import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager; import org.apache.stratos.manager.subscriber.Subscriber; @@ -41,13 +39,9 @@ import org.apache.stratos.manager.subscription.tenancy.SubscriptionTenancyBehavi import org.apache.stratos.manager.subscription.utils.CartridgeSubscriptionUtils; import org.apache.stratos.manager.utils.ApplicationManagementUtil; import org.apache.stratos.manager.utils.CartridgeConstants; -import org.apache.stratos.cloud.controller.pojo.CartridgeInfo; -import org.apache.stratos.cloud.controller.pojo.Property; import org.wso2.carbon.context.CarbonContext; import java.util.Collection; -import java.util.Map; -import java.util.Set; /** * Manager class for the purpose of managing CartridgeSubscriptionInfo subscriptions, groupings, etc. @@ -176,8 +170,10 @@ public class CartridgeSubscriptionManager { cartridgeType + ", Repo URL: " + repositoryURL + ", Policy: " + autoscalingPolicyName); + //moved to SubscriptionSingleTenantBehaviour#createSubscription method + //Create the payload - BasicPayloadData basicPayloadData = CartridgeSubscriptionUtils.createBasicPayload(cartridgeSubscription); + /*BasicPayloadData basicPayloadData = CartridgeSubscriptionUtils.createBasicPayload(cartridgeSubscription); //Populate the basic payload details basicPayloadData.populatePayload(); @@ -210,12 +206,15 @@ public class CartridgeSubscriptionManager { } cartridgeSubscription.setPayloadData(payloadData); + */ // Publish tenant subscribed event to message broker CartridgeSubscriptionUtils.publishTenantSubscribedEvent(cartridgeSubscription.getSubscriber().getTenantId(), cartridgeSubscription.getCartridgeInfo().getType()); - - if(cartridgeInfo.getMultiTenant()) { + + //moved to SubscriptionMultiTenantBehaviour#createSubscription method + + /*if(cartridgeInfo.getMultiTenant()) { log.info(" Multitenant --> Publishing Artifact update event -- "); log.info(" Values : cluster id - " + cartridgeSubscription.getClusterDomain() + " tenant - " + cartridgeSubscription.getSubscriber().getTenantId()); @@ -223,7 +222,7 @@ public class CartridgeSubscriptionManager { cartridgeSubscription.getClusterDomain(), // clusterId String.valueOf(cartridgeSubscription.getSubscriber().getTenantId())); publisher.publish(); - } + }*/ return cartridgeSubscription; } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f0e04b57/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/retriever/DataInsertionAndRetrievalManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/retriever/DataInsertionAndRetrievalManager.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/retriever/DataInsertionAndRetrievalManager.java index 90aa5cc..9d988bd 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/retriever/DataInsertionAndRetrievalManager.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/retriever/DataInsertionAndRetrievalManager.java @@ -29,6 +29,7 @@ import org.apache.stratos.manager.persistence.RegistryBasedPersistenceManager; import org.apache.stratos.manager.subscription.CartridgeSubscription; import java.util.Collection; +import java.util.Set; public class DataInsertionAndRetrievalManager { @@ -238,13 +239,13 @@ public class DataInsertionAndRetrievalManager { } } - public CartridgeSubscription getCartridgeSubscription (String clusterId) { + public Set<CartridgeSubscription> getCartridgeSubscription (String clusterId) { // acquire read lock LookupDataHolder.getInstance().acquireReadLock(); try { - CartridgeSubscription cartridgeSubscription = LookupDataHolder.getInstance().getSubscription(clusterId); + Set<CartridgeSubscription> cartridgeSubscriptions = LookupDataHolder.getInstance().getSubscription(clusterId); /*if (cartridgeSubscription == null) { // not available in the cache, look in the registry if (log.isDebugEnabled()) { @@ -264,7 +265,7 @@ public class DataInsertionAndRetrievalManager { // LookupDataHolder.getInstance().putSubscription(cartridgeSubscription); }*/ - return cartridgeSubscription; + return cartridgeSubscriptions; } finally { // release read lock http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f0e04b57/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/tenancy/SubscriptionMultiTenantBehaviour.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/tenancy/SubscriptionMultiTenantBehaviour.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/tenancy/SubscriptionMultiTenantBehaviour.java index 8084468..00caf16 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/tenancy/SubscriptionMultiTenantBehaviour.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/tenancy/SubscriptionMultiTenantBehaviour.java @@ -21,14 +21,15 @@ package org.apache.stratos.manager.subscription.tenancy; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.pojo.Properties; import org.apache.stratos.manager.exception.ADCException; import org.apache.stratos.manager.exception.AlreadySubscribedException; import org.apache.stratos.manager.exception.NotSubscribedException; import org.apache.stratos.manager.exception.UnregisteredCartridgeException; +import org.apache.stratos.manager.publisher.ArtifactUpdatePublisher; import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager; import org.apache.stratos.manager.subscription.CartridgeSubscription; import org.apache.stratos.manager.utils.CartridgeConstants; -import org.apache.stratos.cloud.controller.pojo.Properties; public class SubscriptionMultiTenantBehaviour extends SubscriptionTenancyBehaviour { @@ -68,24 +69,23 @@ public class SubscriptionMultiTenantBehaviour extends SubscriptionTenancyBehavio } } - //TODO: implement getting cluster Id from DB - /*TopologyManager.acquireReadLock(); - - try { - Service service = TopologyManager.getTopology().getService(cartridgeSubscription.getType()); - if(service == null) { - TopologyManager.releaseReadLock(); - String errorMsg = "Error in subscribing, no service found with the name " + cartridgeSubscription.getType(); - log.error(errorMsg); - throw new ADCException(errorMsg); + if (cartridgeSubscription.getRepository() != null) { + + // publish the ArtifactUpdated event + log.info(" Multitenant --> Publishing Artifact update event -- "); + log.info(" Values : cluster id - " + cartridgeSubscription.getClusterDomain() + " tenant - " + + cartridgeSubscription.getSubscriber().getTenantId()); + ArtifactUpdatePublisher publisher = new ArtifactUpdatePublisher(cartridgeSubscription.getRepository(), + cartridgeSubscription.getClusterDomain(), // clusterId + String.valueOf(cartridgeSubscription.getSubscriber().getTenantId())); + publisher.publish(); + + } else { + if(log.isDebugEnabled()) { + log.debug("No repository found for subscription with alias: " + cartridgeSubscription.getAlias() + ", type: " + cartridgeSubscription.getType()+ + ". Not sending the Artifact Updated event"); } - - //cartridgeSubscription.getCluster().setClusterDomain(service.getCluster().); - //cartridgeSubscription.getCluster().setClusterSubDomain(domainContext.getSubDomain()); - - } finally { - TopologyManager.releaseReadLock(); - }*/ + } } public void registerSubscription(CartridgeSubscription cartridgeSubscription, Properties properties) http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f0e04b57/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/tenancy/SubscriptionSingleTenantBehaviour.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/tenancy/SubscriptionSingleTenantBehaviour.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/tenancy/SubscriptionSingleTenantBehaviour.java index cab0bac..cde0655 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/tenancy/SubscriptionSingleTenantBehaviour.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/tenancy/SubscriptionSingleTenantBehaviour.java @@ -22,14 +22,24 @@ package org.apache.stratos.manager.subscription.tenancy; import org.apache.axis2.AxisFault; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.pojo.CartridgeInfo; +import org.apache.stratos.cloud.controller.pojo.Property; import org.apache.stratos.manager.client.CloudControllerServiceClient; import org.apache.stratos.manager.exception.ADCException; import org.apache.stratos.manager.exception.AlreadySubscribedException; import org.apache.stratos.manager.exception.NotSubscribedException; import org.apache.stratos.manager.exception.UnregisteredCartridgeException; +import org.apache.stratos.manager.payload.BasicPayloadData; +import org.apache.stratos.manager.payload.PayloadData; +import org.apache.stratos.manager.payload.PayloadFactory; import org.apache.stratos.manager.subscription.CartridgeSubscription; +import org.apache.stratos.manager.subscription.utils.CartridgeSubscriptionUtils; import org.apache.stratos.manager.utils.ApplicationManagementUtil; import org.apache.stratos.cloud.controller.pojo.Properties; +import org.apache.stratos.manager.utils.CartridgeConstants; + +import java.util.Map; +import java.util.Set; public class SubscriptionSingleTenantBehaviour extends SubscriptionTenancyBehaviour { @@ -45,6 +55,42 @@ public class SubscriptionSingleTenantBehaviour extends SubscriptionTenancyBehavi cartridgeSubscription.getCluster().getHostName() + "." + cartridgeSubscription.getType() + ".domain"); cartridgeSubscription.getCluster().setHostName(cartridgeSubscription.getAlias() + "." + cartridgeSubscription.getCluster().getHostName()); + + //Create the payload + BasicPayloadData basicPayloadData = CartridgeSubscriptionUtils.createBasicPayload(cartridgeSubscription); + //Populate the basic payload details + basicPayloadData.populatePayload(); + + CartridgeInfo cartridgeInfo = cartridgeSubscription.getCartridgeInfo(); + PayloadData payloadData = PayloadFactory.getPayloadDataInstance(cartridgeInfo.getProvider(), + cartridgeInfo.getType(), basicPayloadData); + + // get the payload parameters defined in the cartridge definition file for this cartridge type + if (cartridgeInfo.getProperties() != null && cartridgeInfo.getProperties().length != 0) { + + for (Property property : cartridgeInfo.getProperties()) { + // check if a property is related to the payload. Currently this is done by checking if the + // property name starts with 'payload_parameter.' suffix. If so the payload param name will + // be taken as the substring from the index of '.' to the end of the property name. + if (property.getName() + .startsWith(CartridgeConstants.CUSTOM_PAYLOAD_PARAM_NAME_PREFIX)) { + String payloadParamName = property.getName(); + payloadData.add(payloadParamName.substring(payloadParamName.indexOf(".") + 1), property.getValue()); + } + } + } + + //check if there are any custom payload entries defined + if (cartridgeSubscription.getCustomPayloadEntries() != null) { + //add them to the payload + Map<String, String> customPayloadEntries = cartridgeSubscription.getCustomPayloadEntries(); + Set<Map.Entry<String,String>> entrySet = customPayloadEntries.entrySet(); + for (Map.Entry<String, String> entry : entrySet) { + payloadData.add(entry.getKey(), entry.getValue()); + } + } + + cartridgeSubscription.setPayloadData(payloadData); } public void registerSubscription(CartridgeSubscription cartridgeSubscription, Properties properties) throws ADCException, UnregisteredCartridgeException { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f0e04b57/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java index 6dc827d..f475db5 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java @@ -34,6 +34,8 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessa import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.message.receiver.topology.TopologyReceiver; +import java.util.Set; + public class StratosManagerTopologyReceiver implements Runnable { private static final Log log = LogFactory.getLog(StratosManagerTopologyReceiver.class); @@ -69,12 +71,15 @@ public class StratosManagerTopologyReceiver implements Runnable { //iterate through all clusters for (Cluster cluster : service.getClusters()) { //get subscription details - CartridgeSubscription cartridgeSubscription = getCartridgeSubscription(cluster.getClusterId()); - - if(cartridgeSubscription != null) { - //add the information to Topology Cluster Info. model - TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), - cartridgeSubscription.getType(), cartridgeSubscription.getAlias(), cluster); + Set<CartridgeSubscription> cartridgeSubscriptions = getCartridgeSubscription(cluster.getClusterId()); + + if(cartridgeSubscriptions != null) { + // iterate and do the relevant changes + for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { + //add the information to Topology Cluster Info. model + TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), + cartridgeSubscription.getType(), cartridgeSubscription.getAlias(), cluster); + } } } } @@ -93,21 +98,26 @@ public class StratosManagerTopologyReceiver implements Runnable { ClusterCreatedEvent clustercreatedEvent = (ClusterCreatedEvent) event; //get subscription details - CartridgeSubscription cartridgeSubscription = - getCartridgeSubscription(clustercreatedEvent.getClusterId()); + Set<CartridgeSubscription> cartridgeSubscriptions = getCartridgeSubscription(clustercreatedEvent.getClusterId()); - if(cartridgeSubscription != null) { + if(cartridgeSubscriptions != null) { Cluster cluster; //acquire read lock TopologyManager.acquireReadLock(); + try { - cluster = TopologyManager.getTopology(). - getService(cartridgeSubscription.getType()).getCluster(cartridgeSubscription.getClusterDomain()); - //add the information to Topology Cluster Info. model - TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), - cartridgeSubscription.getType(), cartridgeSubscription.getAlias(), cluster); + // iterate and do the relevant changes + for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { + //add the information to Topology Cluster Info. model + cluster = TopologyManager.getTopology(). + getService(cartridgeSubscription.getType()).getCluster(cartridgeSubscription.getClusterDomain()); + + //add the information to Topology Cluster Info. model + TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), + cartridgeSubscription.getType(), cartridgeSubscription.getAlias(), cluster); + } } finally { //release read lock @@ -126,13 +136,17 @@ public class StratosManagerTopologyReceiver implements Runnable { ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event; - CartridgeSubscription cartridgeSubscription = + Set<CartridgeSubscription> cartridgeSubscriptions = getCartridgeSubscription(clusterRemovedEvent.getClusterId()); - if(cartridgeSubscription != null) { - //add the information to Topology Cluster Info. model - TopologyClusterInformationModel.getInstance().removeCluster(cartridgeSubscription.getSubscriber().getTenantId(), - cartridgeSubscription.getType(), cartridgeSubscription.getAlias()); + if(cartridgeSubscriptions != null) { + + // iterate + for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { + //add the information to Topology Cluster Info. model + TopologyClusterInformationModel.getInstance().removeCluster(cartridgeSubscription.getSubscriber().getTenantId(), + cartridgeSubscription.getType(), cartridgeSubscription.getAlias()); + } } } }); @@ -147,20 +161,23 @@ public class StratosManagerTopologyReceiver implements Runnable { MemberStartedEvent memberStartedEvent = (MemberStartedEvent) event; String clusterDomain = memberStartedEvent.getClusterId(); - CartridgeSubscription cartridgeSubscription = getCartridgeSubscription(clusterDomain); + Set<CartridgeSubscription> cartridgeSubscriptions = getCartridgeSubscription(clusterDomain); - if(cartridgeSubscription != null) { + if(cartridgeSubscriptions != null) { Cluster cluster; //acquire read lock TopologyManager.acquireReadLock(); try { - cluster = TopologyManager.getTopology(). - getService(cartridgeSubscription.getType()).getCluster(cartridgeSubscription.getClusterDomain()); + for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { + + cluster = TopologyManager.getTopology(). + getService(cartridgeSubscription.getType()).getCluster(cartridgeSubscription.getClusterDomain()); - TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), - cartridgeSubscription.getType(), cartridgeSubscription.getAlias(), cluster); + TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), + cartridgeSubscription.getType(), cartridgeSubscription.getAlias(), cluster); + } } finally { //release read lock @@ -181,21 +198,25 @@ public class StratosManagerTopologyReceiver implements Runnable { MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; String clusterDomain = memberActivatedEvent.getClusterId(); - CartridgeSubscription cartridgeSubscription = getCartridgeSubscription(clusterDomain); + Set<CartridgeSubscription> cartridgeSubscriptions = getCartridgeSubscription(clusterDomain); - if(cartridgeSubscription != null) { + if(cartridgeSubscriptions != null) { Cluster cluster; //acquire read lock TopologyManager.acquireReadLock(); try { - cluster = TopologyManager.getTopology(). - getService(cartridgeSubscription.getType()).getCluster(cartridgeSubscription.getClusterDomain()); - TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), - cartridgeSubscription.getType(), - cartridgeSubscription.getAlias(), cluster); + for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { + + cluster = TopologyManager.getTopology(). + getService(cartridgeSubscription.getType()).getCluster(cartridgeSubscription.getClusterDomain()); + + TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), + cartridgeSubscription.getType(), + cartridgeSubscription.getAlias(), cluster); + } } finally { //release read lock @@ -216,20 +237,24 @@ public class StratosManagerTopologyReceiver implements Runnable { MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event; String clusterDomain = memberSuspendedEvent.getClusterId(); - CartridgeSubscription cartridgeSubscription = getCartridgeSubscription(clusterDomain); + Set<CartridgeSubscription> cartridgeSubscriptions = getCartridgeSubscription(clusterDomain); - if(cartridgeSubscription != null) { + if(cartridgeSubscriptions != null) { Cluster cluster; //acquire read lock TopologyManager.acquireReadLock(); try { - cluster = TopologyManager.getTopology(). - getService(cartridgeSubscription.getType()).getCluster(cartridgeSubscription.getClusterDomain()); - TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), - cartridgeSubscription.getType(), cartridgeSubscription.getAlias(), cluster); + for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { + + cluster = TopologyManager.getTopology(). + getService(cartridgeSubscription.getType()).getCluster(cartridgeSubscription.getClusterDomain()); + + TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), + cartridgeSubscription.getType(), cartridgeSubscription.getAlias(), cluster); + } } finally { //release read lock @@ -250,20 +275,24 @@ public class StratosManagerTopologyReceiver implements Runnable { MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; String clusterDomain = memberTerminatedEvent.getClusterId(); - CartridgeSubscription cartridgeSubscription = getCartridgeSubscription(clusterDomain); + Set<CartridgeSubscription> cartridgeSubscriptions = getCartridgeSubscription(clusterDomain); - if(cartridgeSubscription != null) { + if(cartridgeSubscriptions != null) { Cluster cluster; //acquire read lock TopologyManager.acquireReadLock(); try { - cluster = TopologyManager.getTopology(). - getService(cartridgeSubscription.getType()).getCluster(cartridgeSubscription.getClusterDomain()); - TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), - cartridgeSubscription.getType(), cartridgeSubscription.getAlias(), cluster); + for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { + + cluster = TopologyManager.getTopology(). + getService(cartridgeSubscription.getType()).getCluster(cartridgeSubscription.getClusterDomain()); + + TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscription.getSubscriber().getTenantId(), + cartridgeSubscription.getType(), cartridgeSubscription.getAlias(), cluster); + } } finally { //release read lock @@ -277,7 +306,7 @@ public class StratosManagerTopologyReceiver implements Runnable { return processorChain; } - private CartridgeSubscription getCartridgeSubscription(String clusterDomain) { + private Set<CartridgeSubscription> getCartridgeSubscription(String clusterDomain) { try { return new DataInsertionAndRetrievalManager().getCartridgeSubscription(clusterDomain);
