http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java index c007343..94b9650 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java @@ -27,6 +27,7 @@ import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Util; public class ClusterCreatedMessageProcessor extends MessageProcessor { @@ -41,83 +42,97 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor { @Override public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; + Topology topology = (Topology) object; if (ClusterCreatedEvent.class.getName().equals(type)) { // Return if topology has not been initialized - if (!topology.isInitialized()) + if (!topology.isInitialized()) { return false; + } // Parse complete message and build event ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class); - // Apply service filter - if (TopologyServiceFilter.getInstance().isActive()) { - if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { - // Service is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); - } - return false; - } - } + TopologyManager.acquireReadLockForServices(); + TopologyManager.acquireWriteLockForService(event.getServiceName()); + try { + return doProcess(event, topology); - // Apply cluster filter - if (TopologyClusterFilter.getInstance().isActive()) { - if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { - // Cluster is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); - } - return false; - } + } finally { + TopologyManager.releaseWriteLockForService(event.getServiceName()); + TopologyManager.releaseReadLockForServices(); } - // Validate event properties - Cluster cluster = event.getCluster(); - if(cluster == null) { - String msg = "Cluster object of cluster created event is null."; - log.error(msg); - throw new RuntimeException(msg); - } - if (cluster.getHostNames().isEmpty()) { - throw new RuntimeException("Host name/s not found in cluster created event"); + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } - // Validate event against the existing topology - Service service = topology.getService(event.getServiceName()); - if (service == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Service does not exist: [service] %s", - event.getServiceName())); + } + } + + private boolean doProcess (ClusterCreatedEvent event,Topology topology) { + + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); } return false; } - if (service.clusterExists(event.getClusterId())) { - if (log.isWarnEnabled()) { - log.warn(String.format("Cluster already exists in service: [service] %s [cluster] %s", event.getServiceName(), - event.getClusterId())); - } - } else { - - // Apply changes to the topology - service.addCluster(cluster); - if (log.isInfoEnabled()) { - log.info(String.format("Cluster created: %s", - cluster.toString())); - } - } + } - // Notify event listeners - notifyEventListeners(event); - return true; + // Apply cluster filter + if (TopologyClusterFilter.getInstance().isActive()) { + if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { + // Cluster is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); + } + return false; + } + } + // Validate event properties + Cluster cluster = event.getCluster(); + if(cluster == null) { + String msg = "Cluster object of cluster created event is null."; + log.error(msg); + throw new RuntimeException(msg); + } + if (cluster.getHostNames().isEmpty()) { + throw new RuntimeException("Host name/s not found in cluster created event"); + } + // Validate event against the existing topology + Service service = topology.getService(event.getServiceName()); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service does not exist: [service] %s", + event.getServiceName())); + } + return false; + } + if (service.clusterExists(event.getClusterId())) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster already exists in service: [service] %s [cluster] %s", event.getServiceName(), + event.getClusterId())); + } } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + + // Apply changes to the topology + service.addCluster(cluster); + if (log.isInfoEnabled()) { + log.info(String.format("Cluster created: %s", + cluster.toString())); } } + + // Notify event listeners + notifyEventListeners(event); + return true; } }
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java index f125c54..8629363 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java @@ -25,6 +25,7 @@ import org.apache.stratos.messaging.event.topology.ClusterMaintenanceModeEvent; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Util; public class ClusterMaintenanceModeMessageProcessor extends MessageProcessor { @@ -49,64 +50,77 @@ public class ClusterMaintenanceModeMessageProcessor extends MessageProcessor { ClusterMaintenanceModeEvent event = (ClusterMaintenanceModeEvent) Util. jsonToObject(message, ClusterMaintenanceModeEvent.class); - // Apply service filter - if (TopologyServiceFilter.getInstance().isActive()) { - if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { - // Service is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); - } - return false; - } + TopologyManager.acquireReadLockForServices(); + TopologyManager.acquireWriteLockForService(event.getServiceName()); + try { + return doProcess(event, topology); + + } finally { + TopologyManager.releaseWriteLockForService(event.getServiceName()); + TopologyManager.releaseReadLockForServices(); } - // Apply cluster filter - if (TopologyClusterFilter.getInstance().isActive()) { - if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { - // Cluster is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); - } - return false; - } + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } + } + } + + private boolean doProcess (ClusterMaintenanceModeEvent event,Topology topology) { - // Validate event against the existing topology - Service service = topology.getService(event.getServiceName()); - if (service == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Service does not exist: [service] %s", - event.getServiceName())); + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); } return false; } - Cluster cluster = service.getCluster(event.getClusterId()); + } - if (cluster == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(), - event.getClusterId())); + // Apply cluster filter + if (TopologyClusterFilter.getInstance().isActive()) { + if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { + // Cluster is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); } - } else { - // Apply changes to the topology - cluster.setStatus(Status.In_Maintenance); - if (log.isInfoEnabled()) { - log.info(String.format("Cluster updated as maintenance mode: %s", - cluster.toString())); - } - } + return false; + } + } - // Notify event listeners - notifyEventListeners(event); - return true; + // Validate event against the existing topology + Service service = topology.getService(event.getServiceName()); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service does not exist: [service] %s", + event.getServiceName())); + } + return false; + } + Cluster cluster = service.getCluster(event.getClusterId()); + if (cluster == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(), + event.getClusterId())); + } } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + // Apply changes to the topology + cluster.setStatus(Status.In_Maintenance); + if (log.isInfoEnabled()) { + log.info(String.format("Cluster updated as maintenance mode: %s", + cluster.toString())); } } + + // Notify event listeners + notifyEventListeners(event); + return true; } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java index 69ef5b0..1dfb929 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java @@ -26,6 +26,7 @@ import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Util; public class ClusterRemovedMessageProcessor extends MessageProcessor { @@ -50,65 +51,79 @@ public class ClusterRemovedMessageProcessor extends MessageProcessor { // Parse complete message and build event ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class); - // Apply service filter - if (TopologyServiceFilter.getInstance().isActive()) { - if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { - // Service is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); - } - return false; - } + TopologyManager.acquireReadLockForServices(); + TopologyManager.acquireWriteLockForService(event.getServiceName()); + try { + return doProcess(event, topology); + + } finally { + TopologyManager.releaseWriteLockForService(event.getServiceName()); + TopologyManager.releaseReadLockForServices(); } - // Apply cluster filter - if (TopologyClusterFilter.getInstance().isActive()) { - if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { - // Cluster is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); - } - return false; - } + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } + } + } + + private boolean doProcess (ClusterRemovedEvent event,Topology topology) { - // Validate event against the existing topology - Service service = topology.getService(event.getServiceName()); - if (service == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Service does not exist: [service] %s", event.getServiceName())); + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); } return false; } + } - // Notify event listeners before removing the cluster object - notifyEventListeners(event); - - if (!service.clusterExists(event.getClusterId())) { - if (log.isWarnEnabled()) { - log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", - event.getServiceName(), - event.getClusterId())); + // Apply cluster filter + if (TopologyClusterFilter.getInstance().isActive()) { + if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { + // Cluster is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); } - } else { - - // Apply changes to the topology - service.removeCluster(event.getClusterId()); - - if (log.isInfoEnabled()) { - log.info(String.format("Cluster removed from service: [service] %s [cluster] %s", - event.getServiceName(), event.getClusterId())); - } + return false; + } + } + + // Validate event against the existing topology + Service service = topology.getService(event.getServiceName()); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service does not exist: [service] %s", event.getServiceName())); } + return false; + } + + // Notify event listeners before removing the cluster object + notifyEventListeners(event); - return true; + if (!service.clusterExists(event.getClusterId())) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", + event.getServiceName(), + event.getClusterId())); + } } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + + // Apply changes to the topology + service.removeCluster(event.getClusterId()); + + if (log.isInfoEnabled()) { + log.info(String.format("Cluster removed from service: [service] %s [cluster] %s", + event.getServiceName(), event.getClusterId())); } } + + return true; } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java index 135bdae..6d5cb8f 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java @@ -26,6 +26,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Util; import java.util.ArrayList; @@ -49,102 +50,20 @@ public class CompleteTopologyMessageProcessor extends MessageProcessor { if (CompleteTopologyEvent.class.getName().equals(type)) { // Parse complete message and build event CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class); - - // if topology has not already initialized - if (!topology.isInitialized()) { - - // Apply service filter - if (TopologyServiceFilter.getInstance().isActive()) { - // Add services included in service filter - for (Service service : event.getTopology().getServices()) { - if (TopologyServiceFilter.getInstance() - .serviceNameIncluded(service.getServiceName())) { - topology.addService(service); - } else { - if (log.isDebugEnabled()) { - log.debug(String.format( - "Service is excluded: [service] %s", - service.getServiceName())); - } - } - } - } else { - // Add all services - topology.addServices(event.getTopology().getServices()); - } - - // Apply cluster filter - if (TopologyClusterFilter.getInstance().isActive()) { - for (Service service : topology.getServices()) { - List<Cluster> clustersToRemove = new ArrayList<Cluster>(); - for (Cluster cluster : service.getClusters()) { - if (TopologyClusterFilter.getInstance() - .clusterIdExcluded(cluster.getClusterId())) { - clustersToRemove.add(cluster); - } - } - for (Cluster cluster : clustersToRemove) { - service.removeCluster(cluster); - if (log.isDebugEnabled()) { - log.debug(String.format( - "Cluster is excluded: [cluster] %s", - cluster.getClusterId())); - } - } - } - } - - // Apply member filter - if (TopologyMemberFilter.getInstance().isActive()) { - for (Service service : topology.getServices()) { - for (Cluster cluster : service.getClusters()) { - List<Member> membersToRemove = new ArrayList<Member>(); - for (Member member : cluster.getMembers()) { - if (TopologyMemberFilter.getInstance() - .lbClusterIdExcluded( - member.getLbClusterId())) { - membersToRemove.add(member); - } - } - for (Member member : membersToRemove) { - cluster.removeMember(member); - if (log.isDebugEnabled()) { - log.debug(String - .format("Member is excluded: [member] %s [lb-cluster-id] %s", - member.getMemberId(), - member.getLbClusterId())); - } - } - } - } - } - - // add existing Applications to Topology - Collection<Application> applications = event.getTopology().getApplications(); - if (applications != null && !applications.isEmpty()) { - for (Application application : applications) { - topology.addApplication(application); - if (log.isDebugEnabled()) { - log.debug("Application with id [ " + application.getId() + " ] added to Topology"); - } - } - } else { - if (log.isDebugEnabled()) { - log.debug("No Application information found in Complete Topology event"); - } - } - if (log.isInfoEnabled()) { - log.info("Topology initialized"); - } + if (!topology.isInitialized()) { + TopologyManager.acquireWriteLock(); + + try { + return doProcess(event, topology); - // Set topology initialized - topology.setInitialized(true); - } + } finally { + TopologyManager.releaseWriteLock(); + } + } else { + return true; + } - // Notify event listeners - notifyEventListeners(event); - return true; } else { if (nextProcessor != null) { // ask the next processor to take care of the message. @@ -153,4 +72,99 @@ public class CompleteTopologyMessageProcessor extends MessageProcessor { return false; } } + + private boolean doProcess (CompleteTopologyEvent event, Topology topology) { + + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + // Add services included in service filter + for (Service service : event.getTopology().getServices()) { + if (TopologyServiceFilter.getInstance() + .serviceNameIncluded(service.getServiceName())) { + topology.addService(service); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format( + "Service is excluded: [service] %s", + service.getServiceName())); + } + } + } + } else { + // Add all services + topology.addServices(event.getTopology().getServices()); + } + + // Apply cluster filter + if (TopologyClusterFilter.getInstance().isActive()) { + for (Service service : topology.getServices()) { + List<Cluster> clustersToRemove = new ArrayList<Cluster>(); + for (Cluster cluster : service.getClusters()) { + if (TopologyClusterFilter.getInstance() + .clusterIdExcluded(cluster.getClusterId())) { + clustersToRemove.add(cluster); + } + } + for (Cluster cluster : clustersToRemove) { + service.removeCluster(cluster); + if (log.isDebugEnabled()) { + log.debug(String.format( + "Cluster is excluded: [cluster] %s", + cluster.getClusterId())); + } + } + } + } + + // Apply member filter + if (TopologyMemberFilter.getInstance().isActive()) { + for (Service service : topology.getServices()) { + for (Cluster cluster : service.getClusters()) { + List<Member> membersToRemove = new ArrayList<Member>(); + for (Member member : cluster.getMembers()) { + if (TopologyMemberFilter.getInstance() + .lbClusterIdExcluded( + member.getLbClusterId())) { + membersToRemove.add(member); + } + } + for (Member member : membersToRemove) { + cluster.removeMember(member); + if (log.isDebugEnabled()) { + log.debug(String + .format("Member is excluded: [member] %s [lb-cluster-id] %s", + member.getMemberId(), + member.getLbClusterId())); + } + } + } + } + } + + // add existing Applications to Topology + Collection<Application> applications = event.getTopology().getApplications(); + if (applications != null && !applications.isEmpty()) { + for (Application application : applications) { + topology.addApplication(application); + if (log.isDebugEnabled()) { + log.debug("Application with id [ " + application.getId() + " ] added to Topology"); + } + } + } else { + if (log.isDebugEnabled()) { + log.debug("No Application information found in Complete Topology event"); + } + } + + if (log.isInfoEnabled()) { + log.info("Topology initialized"); + } + + // Set topology initialized + topology.setInitialized(true); + + // Notify event listeners + notifyEventListeners(event); + return true; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java index 627d9a9..7200431 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java @@ -21,11 +21,9 @@ package org.apache.stratos.messaging.message.processor.topology; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.topology.*; -import org.apache.stratos.messaging.event.topology.ClusterActivatedEvent; import org.apache.stratos.messaging.event.topology.GroupActivatedEvent; -import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; -import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Util; /** @@ -53,34 +51,16 @@ public class GroupActivatedProcessor extends MessageProcessor { GroupActivatedEvent event = (GroupActivatedEvent) Util. jsonToObject(message, GroupActivatedEvent.class); - // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); - if (application == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Application does not exist: [service] %s", - event.getAppId())); - } - return false; - } - Group group = application.getGroup(event.getGroupId()); + TopologyManager.acquireReadLockForApplications(); + TopologyManager.acquireWriteLockForApplication(event.getAppId()); - if (group == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(), - event.getGroupId())); - } - } else { - // Apply changes to the topology - group.setStatus(Status.Activated); - if (log.isInfoEnabled()) { - log.info(String.format("Group updated as activated : %s", - group.toString())); - } - } + try { + return doProcess(event, topology); - // Notify event listeners - notifyEventListeners(event); - return true; + } finally { + TopologyManager.releaseWriteLockForApplication(event.getAppId()); + TopologyManager.releaseReadLockForApplications(); + } } else { if (nextProcessor != null) { @@ -91,4 +71,36 @@ public class GroupActivatedProcessor extends MessageProcessor { } } } + + private boolean doProcess (GroupActivatedEvent event,Topology topology) { + + // Validate event against the existing topology + Application application = topology.getApplication(event.getAppId()); + if (application == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Application does not exist: [service] %s", + event.getAppId())); + } + return false; + } + Group group = application.getGroup(event.getGroupId()); + + if (group == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(), + event.getGroupId())); + } + } else { + // Apply changes to the topology + group.setStatus(Status.Activated); + if (log.isInfoEnabled()) { + log.info(String.format("Group updated as activated : %s", + group.toString())); + } + } + + // Notify event listeners + notifyEventListeners(event); + return true; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java index 8e4e1b1..2d3f8b3 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java @@ -26,6 +26,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Util; public class InstanceSpawnedMessageProcessor extends MessageProcessor { @@ -50,92 +51,103 @@ public class InstanceSpawnedMessageProcessor extends MessageProcessor { // Parse complete message and build event InstanceSpawnedEvent event = (InstanceSpawnedEvent) Util.jsonToObject(message, InstanceSpawnedEvent.class); - // Apply service filter - if (TopologyServiceFilter.getInstance().isActive()) { - if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { - // Service is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); - } - return false; - } + TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId()); + try { + return doProcess(event, topology); + + } finally { + TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId()); } - // Apply cluster filter - if (TopologyClusterFilter.getInstance().isActive()) { - if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { - // Cluster is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); - } - return false; - } + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } + } + } + + private boolean doProcess (InstanceSpawnedEvent event,Topology topology){ - // Apply member filter - if(TopologyMemberFilter.getInstance().isActive()) { - if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId())); - } - return false; + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); } + return false; } + } - // Validate event against the existing topology - Service service = topology.getService(event.getServiceName()); - if (service == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Service does not exist: [service] %s", - event.getServiceName())); + // Apply cluster filter + if (TopologyClusterFilter.getInstance().isActive()) { + if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { + // Cluster is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); } return false; } - Cluster cluster = service.getCluster(event.getClusterId()); - if (cluster == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", - event.getServiceName(), event.getClusterId())); + } + + // Apply member filter + if(TopologyMemberFilter.getInstance().isActive()) { + if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId())); } return false; } - if (cluster.memberExists(event.getMemberId())) { - if (log.isWarnEnabled()) { - log.warn(String.format("Member already exists: [service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); - } - } else { - - // Apply changes to the topology - Member member = new Member(event.getServiceName(), event.getClusterId(), event.getNetworkPartitionId(), event.getPartitionId(), event.getMemberId()); - member.setStatus(MemberStatus.Created); - member.setMemberPublicIp(event.getMemberPublicIp()); - member.setMemberIp(event.getMemberIp()); - member.setLbClusterId(event.getLbClusterId()); - member.setProperties(event.getProperties()); - cluster.addMember(member); - - if (log.isInfoEnabled()) { - log.info(String.format("Member created: [service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); - } + } + + // Validate event against the existing topology + Service service = topology.getService(event.getServiceName()); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service does not exist: [service] %s", + event.getServiceName())); + } + return false; + } + Cluster cluster = service.getCluster(event.getClusterId()); + if (cluster == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", + event.getServiceName(), event.getClusterId())); + } + return false; + } + if (cluster.memberExists(event.getMemberId())) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member already exists: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); } + } else { + // Apply changes to the topology + Member member = new Member(event.getServiceName(), event.getClusterId(), event.getNetworkPartitionId(), event.getPartitionId(), event.getMemberId()); + member.setStatus(MemberStatus.Created); + member.setMemberPublicIp(event.getMemberPublicIp()); + member.setMemberIp(event.getMemberIp()); + member.setLbClusterId(event.getLbClusterId()); + member.setProperties(event.getProperties()); + cluster.addMember(member); - // Notify event listeners - notifyEventListeners(event); - return true; - } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + if (log.isInfoEnabled()) { + log.info(String.format("Member created: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); } } + + // Notify event listeners + notifyEventListeners(event); + return true; } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java index a5d701d..ec1b5ec 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java @@ -30,6 +30,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Util; public class MemberActivatedMessageProcessor extends MessageProcessor { @@ -54,111 +55,123 @@ public class MemberActivatedMessageProcessor extends MessageProcessor { // Parse complete message and build event MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(message, MemberActivatedEvent.class); - // Apply service filter - if (TopologyServiceFilter.getInstance().isActive()) { - if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { - // Service is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); - } - return false; + TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId()); + try { + return doProcess(event, topology); + + } finally { + TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId()); + } + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (MemberActivatedEvent event,Topology topology) { + + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); } + return false; } + } - // Apply cluster filter - if (TopologyClusterFilter.getInstance().isActive()) { - if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { - // Cluster is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); - } - return false; + // Apply cluster filter + if (TopologyClusterFilter.getInstance().isActive()) { + if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { + // Cluster is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); } + return false; } + } - // Validate event properties - if ((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) { - throw new RuntimeException(String.format("No ip address found in member activated event: [service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); + // Validate event properties + if ((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) { + throw new RuntimeException(String.format("No ip address found in member activated event: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); + } + if ((event.getPorts() == null) || (event.getPorts().size() == 0)) { + throw new RuntimeException(String.format("No ports found in member activated event: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); + } + + // Validate event against the existing topology + Service service = topology.getService(event.getServiceName()); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service does not exist: [service] %s", event.getServiceName())); } - if ((event.getPorts() == null) || (event.getPorts().size() == 0)) { - throw new RuntimeException(String.format("No ports found in member activated event: [service] %s [cluster] %s [member] %s", + return false; + } + Cluster cluster = service.getCluster(event.getClusterId()); + if (cluster == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", + event.getServiceName(), event.getClusterId())); + } + return false; + } + Member member = cluster.getMember(event.getMemberId()); + if (member == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s", event.getServiceName(), event.getClusterId(), event.getMemberId())); } + return false; + } - // Validate event against the existing topology - Service service = topology.getService(event.getServiceName()); - if (service == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Service does not exist: [service] %s", event.getServiceName())); - } - return false; - } - Cluster cluster = service.getCluster(event.getClusterId()); - if (cluster == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", - event.getServiceName(), event.getClusterId())); - } - return false; - } - Member member = cluster.getMember(event.getMemberId()); - if (member == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); + // Apply member filter + if(TopologyMemberFilter.getInstance().isActive()) { + if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId())); } return false; } + } - // Apply member filter - if(TopologyMemberFilter.getInstance().isActive()) { - if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId())); - } - return false; - } + if (member.getStatus() == MemberStatus.Activated) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member already activated: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); } + } else { - if (member.getStatus() == MemberStatus.Activated) { - if (log.isWarnEnabled()) { - log.warn(String.format("Member already activated: [service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); - } - } else { - - // Apply changes to the topology - member.addPorts(event.getPorts()); - member.setMemberIp(event.getMemberIp()); - member.setStatus(MemberStatus.Activated); - - if (log.isInfoEnabled()) { - log.info(String.format("Member activated: [service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); - } - } + // Apply changes to the topology + member.addPorts(event.getPorts()); + member.setMemberIp(event.getMemberIp()); + member.setStatus(MemberStatus.Activated); - // Notify event listeners - notifyEventListeners(event); - return true; - } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + if (log.isInfoEnabled()) { + log.info(String.format("Member activated: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); } } + + // Notify event listeners + notifyEventListeners(event); + return true; } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java index b6dc489..b252a61 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java @@ -27,6 +27,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Util; public class MemberMaintenanceModeProcessor extends MessageProcessor { @@ -51,98 +52,110 @@ public class MemberMaintenanceModeProcessor extends MessageProcessor { MemberMaintenanceModeEvent event = (MemberMaintenanceModeEvent) Util. jsonToObject(message, MemberMaintenanceModeEvent.class); - // Apply service filter - if (TopologyServiceFilter.getInstance().isActive()) { - if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { - // Service is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); - } - return false; - } + TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId()); + try { + return doProcess(event, topology); + + } finally { + TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId()); } - // Apply cluster filter - if (TopologyClusterFilter.getInstance().isActive()) { - if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { - // Cluster is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); - } - return false; - } + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } + } + } + + private boolean doProcess (MemberMaintenanceModeEvent event,Topology topology) { - // Validate event against the existing topology - Service service = topology.getService(event.getServiceName()); - if (service == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Service does not exist: [service] %s", - event.getServiceName())); + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); } return false; } - Cluster cluster = service.getCluster(event.getClusterId()); - if (cluster == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", - event.getServiceName(), event.getClusterId())); + } + + // Apply cluster filter + if (TopologyClusterFilter.getInstance().isActive()) { + if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { + // Cluster is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); } return false; } - Member member = cluster.getMember(event.getMemberId()); - if (member == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); - } - return false; + } + + // Validate event against the existing topology + Service service = topology.getService(event.getServiceName()); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service does not exist: [service] %s", + event.getServiceName())); } + return false; + } + Cluster cluster = service.getCluster(event.getClusterId()); + if (cluster == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", + event.getServiceName(), event.getClusterId())); + } + return false; + } + Member member = cluster.getMember(event.getMemberId()); + if (member == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); + } + return false; + } - // Apply member filter - if(TopologyMemberFilter.getInstance().isActive()) { - if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId())); - } - return false; + // Apply member filter + if(TopologyMemberFilter.getInstance().isActive()) { + if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId())); } + return false; } + } - if (member.getStatus() == MemberStatus.In_Maintenance) { - if (log.isWarnEnabled()) { - log.warn(String.format("Member already updated as In_Maintenance: " + - "[service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); - } - } else { - - // Apply changes to the topology - member.setStatus(MemberStatus.In_Maintenance); - - if (log.isInfoEnabled()) { - log.info(String.format("Member updated as In_Maintenance: [service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); - } + if (member.getStatus() == MemberStatus.In_Maintenance) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member already updated as In_Maintenance: " + + "[service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); } + } else { + // Apply changes to the topology + member.setStatus(MemberStatus.In_Maintenance); - // Notify event listeners - notifyEventListeners(event); - return true; - } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + if (log.isInfoEnabled()) { + log.info(String.format("Member updated as In_Maintenance: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); } } + + + // Notify event listeners + notifyEventListeners(event); + return true; } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java index 92115aa..f0c3580 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java @@ -26,6 +26,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Util; public class MemberReadyToShutdownMessageProcessor extends MessageProcessor{ @@ -50,98 +51,111 @@ public class MemberReadyToShutdownMessageProcessor extends MessageProcessor{ MemberReadyToShutdownEvent event = (MemberReadyToShutdownEvent) Util. jsonToObject(message, MemberReadyToShutdownEvent.class); - // Apply service filter - if (TopologyServiceFilter.getInstance().isActive()) { - if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { - // Service is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); - } - return false; - } + TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId()); + try { + return doProcess(event, topology); + + } finally { + TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId()); } - // Apply cluster filter - if (TopologyClusterFilter.getInstance().isActive()) { - if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { - // Cluster is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); - } - return false; - } + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } + } + } + + private boolean doProcess (MemberReadyToShutdownEvent event,Topology topology) { - // Validate event against the existing topology - Service service = topology.getService(event.getServiceName()); - if (service == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Service does not exist: [service] %s", - event.getServiceName())); + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); } return false; } - Cluster cluster = service.getCluster(event.getClusterId()); - if (cluster == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", - event.getServiceName(), event.getClusterId())); + } + + // Apply cluster filter + if (TopologyClusterFilter.getInstance().isActive()) { + if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { + // Cluster is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); } return false; } - Member member = cluster.getMember(event.getMemberId()); - if (member == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); - } - return false; + } + + // Validate event against the existing topology + Service service = topology.getService(event.getServiceName()); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service does not exist: [service] %s", + event.getServiceName())); } + return false; + } + Cluster cluster = service.getCluster(event.getClusterId()); + if (cluster == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", + event.getServiceName(), event.getClusterId())); + } + return false; + } + Member member = cluster.getMember(event.getMemberId()); + if (member == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); + } + return false; + } - // Apply member filter - if(TopologyMemberFilter.getInstance().isActive()) { - if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId())); - } - return false; + // Apply member filter + if(TopologyMemberFilter.getInstance().isActive()) { + if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId())); } + return false; } + } - if (member.getStatus() == MemberStatus.ReadyToShutDown) { - if (log.isWarnEnabled()) { - log.warn(String.format("Member already updated as Ready to Shutdown: " + - "[service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); - } - } else { - - // Apply changes to the topology - member.setStatus(MemberStatus.ReadyToShutDown); - - if (log.isInfoEnabled()) { - log.info(String.format("Member updated as Ready to shutdown: [service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); - } + if (member.getStatus() == MemberStatus.ReadyToShutDown) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member already updated as Ready to Shutdown: " + + "[service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); } + } else { + // Apply changes to the topology + member.setStatus(MemberStatus.ReadyToShutDown); - // Notify event listeners - notifyEventListeners(event); - return true; - } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + if (log.isInfoEnabled()) { + log.info(String.format("Member updated as Ready to shutdown: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); } } + + + // Notify event listeners + notifyEventListeners(event); + return true; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java index 4d93957..508ec39 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java @@ -30,6 +30,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Util; public class MemberStartedMessageProcessor extends MessageProcessor { @@ -54,97 +55,109 @@ public class MemberStartedMessageProcessor extends MessageProcessor { // Parse complete message and build event MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class); - // Apply service filter - if (TopologyServiceFilter.getInstance().isActive()) { - if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { - // Service is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); - } - return false; - } + TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId()); + try { + return doProcess(event, topology); + + } finally { + TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId()); } - // Apply cluster filter - if (TopologyClusterFilter.getInstance().isActive()) { - if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { - // Cluster is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); - } - return false; - } + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } + } + } + + private boolean doProcess (MemberStartedEvent event,Topology topology) { - // Validate event against the existing topology - Service service = topology.getService(event.getServiceName()); - if (service == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Service does not exist: [service] %s", - event.getServiceName())); + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); } return false; } - Cluster cluster = service.getCluster(event.getClusterId()); - if (cluster == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", - event.getServiceName(), event.getClusterId())); + } + + // Apply cluster filter + if (TopologyClusterFilter.getInstance().isActive()) { + if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { + // Cluster is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); } return false; } - Member member = cluster.getMember(event.getMemberId()); - if (member == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); - } - return false; + } + + // Validate event against the existing topology + Service service = topology.getService(event.getServiceName()); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service does not exist: [service] %s", + event.getServiceName())); } + return false; + } + Cluster cluster = service.getCluster(event.getClusterId()); + if (cluster == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", + event.getServiceName(), event.getClusterId())); + } + return false; + } + Member member = cluster.getMember(event.getMemberId()); + if (member == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); + } + return false; + } - // Apply member filter - if(TopologyMemberFilter.getInstance().isActive()) { - if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId())); - } - return false; + // Apply member filter + if(TopologyMemberFilter.getInstance().isActive()) { + if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId())); } + return false; } + } - if (member.getStatus() == MemberStatus.Starting) { - if (log.isWarnEnabled()) { - log.warn(String.format("Member already started: [service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); - } - } else { - - // Apply changes to the topology - member.setStatus(MemberStatus.Starting); - - if (log.isInfoEnabled()) { - log.info(String.format("Member started: [service] %s [cluster] %s [member] %s", - event.getServiceName(), - event.getClusterId(), - event.getMemberId())); - } + if (member.getStatus() == MemberStatus.Starting) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member already started: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); } + } else { + // Apply changes to the topology + member.setStatus(MemberStatus.Starting); - // Notify event listeners - notifyEventListeners(event); - return true; - } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + if (log.isInfoEnabled()) { + log.info(String.format("Member started: [service] %s [cluster] %s [member] %s", + event.getServiceName(), + event.getClusterId(), + event.getMemberId())); } } + + + // Notify event listeners + notifyEventListeners(event); + return true; } }
