http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java index 67c9b67..4473add 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java @@ -30,6 +30,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Util; public class MemberSuspendedMessageProcessor extends MessageProcessor { @@ -54,96 +55,108 @@ public class MemberSuspendedMessageProcessor extends MessageProcessor { // Parse complete message and build event MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(message, MemberSuspendedEvent.class); - // Apply service filter - if (TopologyServiceFilter.getInstance().isActive()) { - if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { - // Service is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); - } - return false; - } + TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId()); + try { + return doProcess(event, topology); + + } finally { + TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId()); } - // Apply cluster filter - if (TopologyClusterFilter.getInstance().isActive()) { - if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { - // Cluster is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); - } - return false; - } + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } + } + } + + private boolean doProcess (MemberSuspendedEvent event,Topology topology) { - // Validate event against the existing topology - Service service = topology.getService(event.getServiceName()); - if (service == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Service does not exist: [service] %s", - event.getServiceName())); + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); } return false; } - Cluster cluster = service.getCluster(event.getClusterId()); - if (cluster == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", - event.getServiceName(), event.getClusterId())); + } + + // Apply cluster filter + if (TopologyClusterFilter.getInstance().isActive()) { + if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { + // Cluster is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); } return false; } - Member member = cluster.getMember(event.getMemberId()); - if (member == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); - } - return false; + } + + // Validate event against the existing topology + Service service = topology.getService(event.getServiceName()); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service does not exist: [service] %s", + event.getServiceName())); } + return false; + } + Cluster cluster = service.getCluster(event.getClusterId()); + if (cluster == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", + event.getServiceName(), event.getClusterId())); + } + return false; + } + Member member = cluster.getMember(event.getMemberId()); + if (member == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); + } + return false; + } - // Apply member filter - if(TopologyMemberFilter.getInstance().isActive()) { - if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId())); - } - return false; + // Apply member filter + if(TopologyMemberFilter.getInstance().isActive()) { + if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId())); } + return false; } + } - if (member.getStatus() == MemberStatus.Suspended) { - if (log.isWarnEnabled()) { - log.warn(String.format("Member already suspended: [service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); - } - } else { - - // Apply changes to the topology - member.setStatus(MemberStatus.Suspended); - - if (log.isInfoEnabled()) { - log.info(String.format("Member suspended: [service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); - } + if (member.getStatus() == MemberStatus.Suspended) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member already suspended: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); } + } else { + // Apply changes to the topology + member.setStatus(MemberStatus.Suspended); - notifyEventListeners(event); - return true; - } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + if (log.isInfoEnabled()) { + log.info(String.format("Member suspended: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); } } + + + notifyEventListeners(event); + return true; } }
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java index 5b5cbc9..3619b53 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java @@ -29,6 +29,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Util; public class MemberTerminatedMessageProcessor extends MessageProcessor { @@ -53,87 +54,99 @@ public class MemberTerminatedMessageProcessor extends MessageProcessor { // Parse complete message and build event MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(message, MemberTerminatedEvent.class); - // Apply service filter - if (TopologyServiceFilter.getInstance().isActive()) { - if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { - // Service is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); - } - return false; - } + TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId()); + try { + return doProcess(event, topology); + + } finally { + TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId()); } - // Apply cluster filter - if (TopologyClusterFilter.getInstance().isActive()) { - if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { - // Cluster is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); - } - return false; - } + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } + } + } + + private boolean doProcess (MemberTerminatedEvent event,Topology topology) { - // Validate event against the existing topology - Service service = topology.getService(event.getServiceName()); - if (service == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Service does not exist: [service] %s", event.getServiceName())); + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); } return false; } - Cluster cluster = service.getCluster(event.getClusterId()); - if (cluster == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", - event.getServiceName(), event.getClusterId())); + } + + // Apply cluster filter + if (TopologyClusterFilter.getInstance().isActive()) { + if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { + // Cluster is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); } return false; } - Member member = cluster.getMember(event.getMemberId()); - if(member != null) { - // Apply member filter - if(TopologyMemberFilter.getInstance().isActive()) { - if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId())); - } - return false; + } + + // Validate event against the existing topology + Service service = topology.getService(event.getServiceName()); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service does not exist: [service] %s", event.getServiceName())); + } + return false; + } + Cluster cluster = service.getCluster(event.getClusterId()); + if (cluster == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", + event.getServiceName(), event.getClusterId())); + } + return false; + } + Member member = cluster.getMember(event.getMemberId()); + if(member != null) { + // Apply member filter + if(TopologyMemberFilter.getInstance().isActive()) { + if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId())); } + return false; } } + } - // Notify event listeners before removing member object - notifyEventListeners(event); + // Notify event listeners before removing member object + notifyEventListeners(event); - if (member == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Member already terminated: [service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); - } - } else { - // Remove member from the cluster - cluster.removeMember(member); - - if (log.isInfoEnabled()) { - log.info(String.format("Member terminated: [service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); - } + if (member == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member already terminated: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); } - - return true; } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + // Remove member from the cluster + cluster.removeMember(member); + + if (log.isInfoEnabled()) { + log.info(String.format("Member terminated: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); } } + + return true; } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java index 2c216f0..1c4be8f 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java @@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ServiceCreatedEvent; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Util; public class ServiceCreatedMessageProcessor extends MessageProcessor { @@ -43,44 +44,21 @@ public class ServiceCreatedMessageProcessor extends MessageProcessor { if (ServiceCreatedEvent.class.getName().equals(type)) { // Return if topology has not been initialized - if (!topology.isInitialized()) + if (!topology.isInitialized()) { return false; + } // Parse complete message and build event ServiceCreatedEvent event = (ServiceCreatedEvent) Util.jsonToObject(message, ServiceCreatedEvent.class); - // Apply service filter - if (TopologyServiceFilter.getInstance().isActive()) { - if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { - // Service is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); - } - return false; - } - } + TopologyManager.acquireWriteLockForServices(); + try { + return doProcess(event, topology); - // Validate event against the existing topology - if (topology.serviceExists(event.getServiceName())) { - if (log.isWarnEnabled()) { - log.warn(String.format("Service already created: [service] %s", event.getServiceName())); - } - } else { - - // Apply changes to the topology - Service service = new Service(event.getServiceName(), event.getServiceType()); - service.addPorts(event.getPorts()); - topology.addService(service); - - if (log.isInfoEnabled()) { - log.info(String.format("Service created: [service] %s", event.getServiceName())); - } + } finally { + TopologyManager.releaseWriteLockForServices(); } - - notifyEventListeners(event); - return true; - } else { if (nextProcessor != null) { // ask the next processor to take care of the message. @@ -90,4 +68,40 @@ public class ServiceCreatedMessageProcessor extends MessageProcessor { } } } + + private boolean doProcess (ServiceCreatedEvent event, Topology topology) { + + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); + } + return false; + } + } + + // Validate event against the existing topology + if (topology.serviceExists(event.getServiceName())) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service already created: [service] %s", event.getServiceName())); + } + } else { + + // Apply changes to the topology + Service service = new Service(event.getServiceName(), event.getServiceType()); + service.addPorts(event.getPorts()); + topology.addService(service); + + if (log.isInfoEnabled()) { + log.info(String.format("Service created: [service] %s", event.getServiceName())); + } + } + + + notifyEventListeners(event); + return true; + + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java index 2c0bc70..a38cbdc 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java @@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Util; public class ServiceRemovedMessageProcessor extends MessageProcessor { @@ -49,38 +50,14 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor { // Parse complete message and build event ServiceRemovedEvent event = (ServiceRemovedEvent) Util.jsonToObject(message, ServiceRemovedEvent.class); - // Apply service filter - if (TopologyServiceFilter.getInstance().isActive()) { - if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { - // Service is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); - } - return false; - } - } - - // Notify event listeners before removing service object - notifyEventListeners(event); + TopologyManager.acquireWriteLockForServices(); + try { + return doProcess(event, topology); - // Validate event against the existing topology - Service service = topology.getService(event.getServiceName()); - if (service == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Service does not exist: [service] %s", - event.getServiceName())); - } - } else { - - // Apply changes to the topology - topology.removeService(service); - - if (log.isInfoEnabled()) { - log.info(String.format("Service removed: [service] %s", event.getServiceName())); - } + } finally { + TopologyManager.releaseWriteLockForServices(); } - return true; } else { if (nextProcessor != null) { // ask the next processor to take care of the message. @@ -90,4 +67,40 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor { } } } + + private boolean doProcess (ServiceRemovedEvent event, Topology topology) { + + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); + } + return false; + } + } + + // Notify event listeners before removing service object + notifyEventListeners(event); + + // Validate event against the existing topology + Service service = topology.getService(event.getServiceName()); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service does not exist: [service] %s", + event.getServiceName())); + } + } else { + + // Apply changes to the topology + topology.removeService(service); + + if (log.isInfoEnabled()) { + log.info(String.format("Service removed: [service] %s", event.getServiceName())); + } + } + + return true; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java index e3ddfa3..db9e8b1 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java @@ -47,7 +47,7 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor; private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor; private GroupActivatedProcessor groupActivatedProcessor; - private CompositeApplicationRemovedMessageProcessor compositeApplicationRemovedMessageProcessor; + //private CompositeApplicationRemovedMessageProcessor compositeApplicationRemovedMessageProcessor; private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor; private ApplicationRemovedMessageProcessor applicationRemovedMessageProcessor; private ApplicationActivatedMessageProcessor applicationActivatedMessageProcessor; @@ -109,11 +109,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { add(applicationActivatedMessageProcessor); if (log.isDebugEnabled()) { - log.debug("Grouping: added applicationCreatedMessageProcessor, applicationRemovedMessageProcessor: " + - applicationCreatedMessageProcessor + " / " + applicationRemovedMessageProcessor); - } - - if (log.isDebugEnabled()) { log.debug("Topology message processor chain initialized X1"); } } @@ -153,9 +148,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { applicationCreatedMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof ApplicationRemovedEventListener) { applicationRemovedMessageProcessor.addEventListener(eventListener); - if (log.isDebugEnabled()) { - log.debug("Grouping: added eventlistener to applicationCreatedMessageProcessor: " + eventListener); - } } else if (eventListener instanceof ApplicationActivatedEventListener) { applicationActivatedMessageProcessor.addEventListener(eventListener); } else { http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java index 9cc8f78..218c441 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java @@ -69,15 +69,15 @@ class TopologyEventMessageDelegator implements Runnable { log.debug(String.format("Topology event message [%s] received from queue: %s", type, messageQueue.getClass())); } - try { - TopologyManager.acquireWriteLock(); +// try { +// TopologyManager.acquireWriteLock(); if (log.isDebugEnabled()) { log.debug(String.format("Delegating topology event message: %s", type)); } processorChain.process(type, json, TopologyManager.getTopology()); - } finally { - TopologyManager.releaseWriteLock(); - } +// } finally { +// TopologyManager.releaseWriteLock(); +// } } catch (Exception e) { log.error("Failed to retrieve topology event message", e); http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java index 5df66bd..2ffd7f6 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java @@ -21,7 +21,13 @@ package org.apache.stratos.messaging.message.receiver.topology; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.Application; +import org.apache.stratos.messaging.domain.topology.ClusterDataHolder; import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.domain.topology.locking.TopologyLock; +import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy; + +import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -30,43 +36,454 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * Usage: * Acquire a relevant lock and invoke the getTopology() method inside a try block. * Once processing is done release the lock using a finally block. + * + * Acquiring Locks: + * + * Stratos supports hierarchical locking. As per the practice, we need to lock the + * hierarchy from root level till the relevant sub tree. + * + * Acquire a write lock: + * + * From root level, acquire read lock, and acquire a write lock only for the + * relevant sub tree. + * + * Acquire a read lock: + * + * From root level, acquire read locks till the relevant sub tree + * + * Examples - + * + * Example 1: Acquiring write lock for a Cluster to modify the Cluster object - + * acquiring: + * 1. acquire read lock for all Services + * 2. acquire read lock for the particular Service, to which the cluster belongs to + * 3. acquire write lock for the Cluster + * + * releasing: + * 1. release write lock for the Cluster + * 2. release read lock for the particular Service + * 3. release read lock for all Services + * + * Example 2: Acquiring write lock to add a new Cluster object - + * acquiring: + * 1. acquire read lock for all Services + * 2. acquire write lock for the particular Service, to which the cluster belongs to + * + * releasing: + * 1. release write lock for the particular Service + * 2. release read lock for all Services + * + * Example 3: Acquiring read lock to read Cluster information + * acquiring: + * 1. acquire read lock for all Services + * 2. acquire read lock for the particular Service, to which the cluster belongs to + * 3. acquire read lock for the relevant Cluster + * + * releasing: + * 1. release read lock for the relevant Cluster + * 2. release read lock for the particular Service + * 3. release read lock for all Services + * + * Example 4: Acquiring the write lock to add a deploy a Cartridge (add a new Service) + * acquire: + * 1. acquire write lock for all Services + * + * release: + * 1. release write lock for all Services */ public class TopologyManager { private static final Log log = LogFactory.getLog(TopologyManager.class); private static volatile Topology topology; + private static final TopologyLockHierarchy topologyLockHierarchy = + TopologyLockHierarchy.getInstance(); private static volatile ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); private static volatile ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); private static volatile ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); + + // Top level locks - should be used to lock the entire Topology + public static void acquireReadLock() { if(log.isDebugEnabled()) { - log.debug("Read lock acquired"); + log.debug("Read lock acquired for Topology"); } readLock.lock(); } public static void releaseReadLock() { if(log.isDebugEnabled()) { - log.debug("Read lock released"); + log.debug("Read lock released for Topology"); } readLock.unlock(); } public static void acquireWriteLock() { if(log.isDebugEnabled()) { - log.debug("Write lock acquired"); + log.debug("Write lock acquired for Topology"); } writeLock.lock(); } public static void releaseWriteLock() { if(log.isDebugEnabled()) { - log.debug("Write lock released"); + log.debug("Write lock released for Topology"); } writeLock.unlock(); } + // Application, Service and Cluster read locks + + public static void acquireReadLockForApplications() { + if(log.isDebugEnabled()) { + log.debug("Read lock acquired for Applications"); + } + topologyLockHierarchy.getApplicatioLock().acquireReadLock(); + } + + public static void releaseReadLockForApplications() { + if(log.isDebugEnabled()) { + log.debug("Read lock released for Applications"); + } + topologyLockHierarchy.getApplicatioLock().releaseReadLock(); + } + + public static void acquireReadLockForServices() { + if(log.isDebugEnabled()) { + log.debug("Read lock acquired for Services"); + } + topologyLockHierarchy.getServiceLock().acquireReadLock(); + } + + public static void releaseReadLockForServices() { + if(log.isDebugEnabled()) { + log.debug("Read lock released for Services"); + } + topologyLockHierarchy.getServiceLock().releaseReadLock(); + } + + // Application, Service and Cluster write locks + + public static void acquireWriteLockForApplications() { + if(log.isDebugEnabled()) { + log.debug("Write lock acquired for Applications"); + } + topologyLockHierarchy.getApplicatioLock().acquireWriteLock(); + } + + public static void releaseWriteLockForApplications() { + if(log.isDebugEnabled()) { + log.debug("Write lock released for Applications"); + } + topologyLockHierarchy.getApplicatioLock().releaseWritelock(); + } + + public static void acquireWriteLockForServices() { + if(log.isDebugEnabled()) { + log.debug("Write lock acquired for Services"); + } + topologyLockHierarchy.getServiceLock().acquireWriteLock(); + } + + public static void releaseWriteLockForServices() { + if(log.isDebugEnabled()) { + log.debug("Write lock released for Services"); + } + topologyLockHierarchy.getServiceLock().releaseWritelock(); + } + + public static void acquireReadLockForService (String serviceName) { + + // acquire read lock for all Services + acquireReadLockForServices(); + + TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName); + if (topologyServiceLock == null) { + handleLockNotFound("Topology lock not found for Service " + serviceName); + + } else { + topologyServiceLock.acquireReadLock(); + if(log.isDebugEnabled()) { + log.debug("Read lock acquired for Service " + serviceName); + } + } + } + + public static void releaseReadLockForService (String serviceName) { + + TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName); + if (topologyServiceLock == null) { + handleLockNotFound("Topology lock not found for Service " + serviceName); + + } else { + topologyServiceLock.releaseReadLock(); + if(log.isDebugEnabled()) { + log.debug("Read lock released for Service " + serviceName); + } + } + + // release read lock for all Services + releaseReadLockForServices(); + } + + public static void acquireWriteLockForService (String serviceName) { + + // acquire read lock for all Applications + acquireReadLockForServices(); + + TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName); + if (topologyServiceLock == null) { + handleLockNotFound("Topology lock not found for Service " + serviceName); + + } else { + topologyServiceLock.acquireWriteLock(); + if(log.isDebugEnabled()) { + log.debug("Write lock acquired for Service " + serviceName); + } + } + } + + public static void releaseWriteLockForService (String serviceName) { + + TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName); + if (topologyServiceLock == null) { + handleLockNotFound("Topology lock not found for Service " + serviceName); + + } else { + topologyServiceLock.releaseWritelock(); + if(log.isDebugEnabled()) { + log.debug("Write lock released for Service " + serviceName); + } + } + + // release read lock for all Services + releaseReadLockForServices(); + } + + public static void acquireReadLockForCluster (String serviceName, String clusterId) { + + // acquire read lock for the relevant Services + acquireReadLockForService(serviceName); + + TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId); + if (topologyClusterLock == null) { + handleLockNotFound("Topology lock not found for Cluster " + clusterId); + + } else { + // acquire read lock for the relevant Cluster + topologyClusterLock.acquireReadLock(); + if(log.isDebugEnabled()) { + log.debug("Read lock acquired for Cluster " + clusterId); + } + } + } + + public static void releaseReadLockForCluster (String serviceName, String clusterId) { + + TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId); + if (topologyClusterLock == null) { + handleLockNotFound("Topology lock not found for Cluster " + clusterId); + + } else { + // release read lock for the relevant Cluster + topologyClusterLock.releaseReadLock(); + if(log.isDebugEnabled()) { + log.debug("Read lock released for Cluster " + clusterId); + } + } + + // release read lock for relevant Service + releaseReadLockForService(serviceName); + } + + public static void acquireWriteLockForCluster (String serviceName, String clusterId) { + + // acquire read lock for the relevant Services + acquireReadLockForService(serviceName); + + TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId); + if (topologyClusterLock == null) { + handleLockNotFound("Topology lock not found for Cluster " + clusterId); + + } else { + topologyClusterLock.acquireWriteLock(); + if(log.isDebugEnabled()) { + log.debug("Write lock acquired for Cluster " + clusterId); + } + } + } + + public static void releaseWriteLockForCluster (String serviceName, String clusterId) { + + TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId); + if (topologyClusterLock == null) { + handleLockNotFound("Topology lock not found for Cluster " + clusterId); + + } else { + topologyClusterLock.releaseWritelock(); + if(log.isDebugEnabled()) { + log.debug("Write lock released for Cluster " + clusterId); + } + } + + // release read lock for relevant Service + releaseReadLockForService(serviceName); + } + + public static void acquireReadLockForApplication (String appId) { + + // acquire read lock for all Applications + acquireReadLockForApplications(); + + // get the Application's cluster's and acquire read locks + Application application = topology.getApplication(appId); + if (application == null) { + log.warn("Application " + appId + " is not found in the Topology"); + + } else { + + Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively(); + if (clusterData != null && !clusterData.isEmpty()) { + for (ClusterDataHolder clusterDataHolder : clusterData) { + // acquire read locks for services and clusters + acquireReadLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId()); + } + + } else { + if (log.isDebugEnabled()) { + log.debug("No Cluster Data found in Application " + appId); + } + } + } + + TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId); + if (topologyAppLock == null) { + handleLockNotFound("Topology lock not found for Application " + appId); + + } else { + // now, lock Application + topologyAppLock.acquireReadLock(); + if(log.isDebugEnabled()) { + log.debug("Read lock acquired for Application " + appId); + } + } + } + + public static void releaseReadLockForApplication (String appId) { + + TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId); + if (topologyAppLock == null) { + handleLockNotFound("Topology lock not found for Application " + appId); + + } else { + // release App lock + topologyAppLock.releaseReadLock(); + if(log.isDebugEnabled()) { + log.debug("Read lock released for Application " + appId); + } + } + + // release read lock for all Applications + releaseReadLockForApplications(); + + // get the Application's cluster information + Application application = topology.getApplication(appId); + if (application == null) { + log.warn("Application " + appId + " is not found in the Topology"); + + } else { + Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively(); + if (clusterData != null && !clusterData.isEmpty()) { + for (ClusterDataHolder clusterDataHolder : clusterData) { + // release read locks for clusters and services + releaseReadLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId()); + } + } else { + if (log.isDebugEnabled()) { + log.debug("No Cluster Data found in Application " + appId); + } + } + } + } + + public static synchronized void acquireWriteLockForApplication (String appId) { + + // acquire read lock for all Applications + acquireReadLockForApplications(); + + // get the Application's cluster's and acquire read locks + Application application = topology.getApplication(appId); + if (application == null) { + log.warn("Application " + appId + " is not found in the Topology"); + + } else { + Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively(); + if (clusterData != null && !clusterData.isEmpty()) { + for (ClusterDataHolder clusterDataHolder : clusterData) { + acquireWriteLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId()); + } + } else { + if (log.isDebugEnabled()) { + log.debug("No Cluster Data found in Application " + appId); + } + } + } + + TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId); + if (topologyAppLock == null) { + handleLockNotFound("Topology lock not found for Application " + appId); + + } else { + // now, lock Application + topologyAppLock.acquireWriteLock(); + if(log.isDebugEnabled()) { + log.debug("Write lock acquired for Application " + appId); + } + } + } + + public static synchronized void releaseWriteLockForApplication (String appId) { + + TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId); + if (topologyAppLock == null) { + handleLockNotFound("Topology lock not found for Application " + appId); + + } else { + // release App lock + topologyAppLock.releaseWritelock(); + if(log.isDebugEnabled()) { + log.debug("Write lock released for Application " + appId); + } + } + + // release read lock for all Applications + releaseReadLockForApplications(); + + // get the Application's cluster's and acquire read + Application application = topology.getApplication(appId); + if (application == null) { + log.warn("Application " + appId + " is not found in the Topology"); + + } else { + Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively(); + if (clusterData != null && !clusterData.isEmpty()) { + for (ClusterDataHolder clusterDataHolder : clusterData) { + // release read locks for clusters and services + releaseWriteLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId()); + } + } else { + if (log.isDebugEnabled()) { + log.debug("No Cluster Data found in Application " + appId); + } + } + } + } + + private static void handleLockNotFound (String errorMsg) { + log.warn(errorMsg); + //throw new RuntimeException(errorMsg); + } + public static Topology getTopology() { if (topology == null) { synchronized (TopologyManager.class){
