http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
index c007343..94b9650 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
@@ -27,6 +27,7 @@ import 
org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class ClusterCreatedMessageProcessor extends MessageProcessor {
@@ -41,83 +42,97 @@ public class ClusterCreatedMessageProcessor extends 
MessageProcessor {
 
     @Override
     public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology) object;
 
+        Topology topology = (Topology) object;
         if (ClusterCreatedEvent.class.getName().equals(type)) {
             // Return if topology has not been initialized
-            if (!topology.isInitialized())
+            if (!topology.isInitialized()) {
                 return false;
+            }
 
             // Parse complete message and build event
             ClusterCreatedEvent event = (ClusterCreatedEvent) 
Util.jsonToObject(message, ClusterCreatedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
-                    // Service is excluded, do not update topology or fire 
event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
-            }
+            TopologyManager.acquireReadLockForServices();
+            TopologyManager.acquireWriteLockForService(event.getServiceName());
+            try {
+                return doProcess(event, topology);
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire 
event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: 
[cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+            } finally {
+                
TopologyManager.releaseWriteLockForService(event.getServiceName());
+                TopologyManager.releaseReadLockForServices();
             }
 
-            // Validate event properties
-            Cluster cluster = event.getCluster();
-            if(cluster == null) {
-                String msg = "Cluster object of cluster created event is 
null.";
-                log.error(msg);
-                throw new RuntimeException(msg);
-            }
-            if (cluster.getHostNames().isEmpty()) {
-                throw new RuntimeException("Host name/s not found in cluster 
created event");
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
             }
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] 
%s",
-                            event.getServiceName()));
+        }
+    }
+
+    private boolean doProcess (ClusterCreatedEvent event,Topology topology) {
+
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] 
%s", event.getServiceName()));
                 }
                 return false;
             }
-            if (service.clusterExists(event.getClusterId())) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster already exists in service: 
[service] %s [cluster] %s", event.getServiceName(),
-                            event.getClusterId()));
-                }
-                       } else {
-
-                               // Apply changes to the topology
-                               service.addCluster(cluster);
-                               if (log.isInfoEnabled()) {
-                                       log.info(String.format("Cluster 
created: %s",
-                                                       cluster.toString()));
-                               }
-                       }
+        }
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] 
%s", event.getClusterId()));
+                }
+                return false;
+            }
+        }
 
+        // Validate event properties
+        Cluster cluster = event.getCluster();
+        if(cluster == null) {
+            String msg = "Cluster object of cluster created event is null.";
+            log.error(msg);
+            throw new RuntimeException(msg);
+        }
+        if (cluster.getHostNames().isEmpty()) {
+            throw new RuntimeException("Host name/s not found in cluster 
created event");
+        }
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
+            }
+            return false;
+        }
+        if (service.clusterExists(event.getClusterId())) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster already exists in service: 
[service] %s [cluster] %s", event.getServiceName(),
+                        event.getClusterId()));
+            }
         } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
+
+            // Apply changes to the topology
+            service.addCluster(cluster);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Cluster created: %s",
+                        cluster.toString()));
             }
         }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
index f125c54..8629363 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
@@ -25,6 +25,7 @@ import 
org.apache.stratos.messaging.event.topology.ClusterMaintenanceModeEvent;
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class ClusterMaintenanceModeMessageProcessor extends MessageProcessor {
@@ -49,64 +50,77 @@ public class ClusterMaintenanceModeMessageProcessor extends 
MessageProcessor {
             ClusterMaintenanceModeEvent event = (ClusterMaintenanceModeEvent) 
Util.
                                 jsonToObject(message, 
ClusterMaintenanceModeEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
-                    // Service is excluded, do not update topology or fire 
event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireReadLockForServices();
+            TopologyManager.acquireWriteLockForService(event.getServiceName());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                
TopologyManager.releaseWriteLockForService(event.getServiceName());
+                TopologyManager.releaseReadLockForServices();
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire 
event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: 
[cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
             }
+        }
+    }
+
+    private boolean doProcess (ClusterMaintenanceModeEvent event,Topology 
topology)  {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] 
%s",
-                            event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] 
%s", event.getServiceName()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
+        }
 
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster not exists in service: 
[service] %s [cluster] %s", event.getServiceName(),
-                            event.getClusterId()));
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] 
%s", event.getClusterId()));
                 }
-                       } else {
-                           // Apply changes to the topology
-                cluster.setStatus(Status.In_Maintenance);
-                               if (log.isInfoEnabled()) {
-                                       log.info(String.format("Cluster updated 
as maintenance mode: %s",
-                                                       cluster.toString()));
-                               }
-                       }
+                return false;
+            }
+        }
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
+            }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
 
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster not exists in service: 
[service] %s [cluster] %s", event.getServiceName(),
+                        event.getClusterId()));
+            }
         } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
+            // Apply changes to the topology
+            cluster.setStatus(Status.In_Maintenance);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Cluster updated as maintenance mode: 
%s",
+                        cluster.toString()));
             }
         }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
index 69ef5b0..1dfb929 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
@@ -26,6 +26,7 @@ import 
org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class ClusterRemovedMessageProcessor extends MessageProcessor {
@@ -50,65 +51,79 @@ public class ClusterRemovedMessageProcessor extends 
MessageProcessor {
             // Parse complete message and build event
             ClusterRemovedEvent event = (ClusterRemovedEvent) 
Util.jsonToObject(message, ClusterRemovedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
-                    // Service is excluded, do not update topology or fire 
event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireReadLockForServices();
+            TopologyManager.acquireWriteLockForService(event.getServiceName());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                
TopologyManager.releaseWriteLockForService(event.getServiceName());
+                TopologyManager.releaseReadLockForServices();
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire 
event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: 
[cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
             }
+        }
+    }
+
+    private boolean doProcess (ClusterRemovedEvent event,Topology topology) {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] 
%s", event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] 
%s", event.getServiceName()));
                 }
                 return false;
             }
+        }
 
-            // Notify event listeners before removing the cluster object
-            notifyEventListeners(event);
-
-            if (!service.clusterExists(event.getClusterId())) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] 
%s [cluster] %s",
-                            event.getServiceName(),
-                            event.getClusterId()));
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] 
%s", event.getClusterId()));
                 }
-            } else {
-               
-               // Apply changes to the topology
-               service.removeCluster(event.getClusterId());
-               
-               if (log.isInfoEnabled()) {
-                       log.info(String.format("Cluster removed from service: 
[service] %s [cluster] %s",
-                                       event.getServiceName(), 
event.getClusterId()));
-               }
+                return false;
+            }
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s", 
event.getServiceName()));
             }
+            return false;
+        }
+
+        // Notify event listeners before removing the cluster object
+        notifyEventListeners(event);
 
-            return true;
+        if (!service.clusterExists(event.getClusterId())) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s 
[cluster] %s",
+                        event.getServiceName(),
+                        event.getClusterId()));
+            }
         } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
+
+            // Apply changes to the topology
+            service.removeCluster(event.getClusterId());
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Cluster removed from service: 
[service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
             }
         }
+
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
index 135bdae..6d5cb8f 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
@@ -26,6 +26,7 @@ import 
org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 import java.util.ArrayList;
@@ -49,102 +50,20 @@ public class CompleteTopologyMessageProcessor extends 
MessageProcessor {
         if (CompleteTopologyEvent.class.getName().equals(type)) {
                // Parse complete message and build event
                CompleteTopologyEvent event = (CompleteTopologyEvent) 
Util.jsonToObject(message, CompleteTopologyEvent.class);
-               
-            // if topology has not already initialized
-                       if (!topology.isInitialized()) {
-
-                               // Apply service filter
-                               if 
(TopologyServiceFilter.getInstance().isActive()) {
-                                       // Add services included in service 
filter
-                                       for (Service service : 
event.getTopology().getServices()) {
-                                               if 
(TopologyServiceFilter.getInstance()
-                                                               
.serviceNameIncluded(service.getServiceName())) {
-                                                       
topology.addService(service);
-                                               } else {
-                                                       if 
(log.isDebugEnabled()) {
-                                                               
log.debug(String.format(
-                                                                               
"Service is excluded: [service] %s",
-                                                                               
service.getServiceName()));
-                                                       }
-                                               }
-                                       }
-                               } else {
-                                       // Add all services
-                                       
topology.addServices(event.getTopology().getServices());
-                               }
-
-                               // Apply cluster filter
-                               if 
(TopologyClusterFilter.getInstance().isActive()) {
-                                       for (Service service : 
topology.getServices()) {
-                                               List<Cluster> clustersToRemove 
= new ArrayList<Cluster>();
-                                               for (Cluster cluster : 
service.getClusters()) {
-                                                       if 
(TopologyClusterFilter.getInstance()
-                                                                       
.clusterIdExcluded(cluster.getClusterId())) {
-                                                               
clustersToRemove.add(cluster);
-                                                       }
-                                               }
-                                               for (Cluster cluster : 
clustersToRemove) {
-                                                       
service.removeCluster(cluster);
-                                                       if 
(log.isDebugEnabled()) {
-                                                               
log.debug(String.format(
-                                                                               
"Cluster is excluded: [cluster] %s",
-                                                                               
cluster.getClusterId()));
-                                                       }
-                                               }
-                                       }
-                               }
-
-                               // Apply member filter
-                               if 
(TopologyMemberFilter.getInstance().isActive()) {
-                                       for (Service service : 
topology.getServices()) {
-                                               for (Cluster cluster : 
service.getClusters()) {
-                                                       List<Member> 
membersToRemove = new ArrayList<Member>();
-                                                       for (Member member : 
cluster.getMembers()) {
-                                                               if 
(TopologyMemberFilter.getInstance()
-                                                                               
.lbClusterIdExcluded(
-                                                                               
                member.getLbClusterId())) {
-                                                                       
membersToRemove.add(member);
-                                                               }
-                                                       }
-                                                       for (Member member : 
membersToRemove) {
-                                                               
cluster.removeMember(member);
-                                                               if 
(log.isDebugEnabled()) {
-                                                                       
log.debug(String
-                                                                               
        .format("Member is excluded: [member] %s [lb-cluster-id] %s",
-                                                                               
                        member.getMemberId(),
-                                                                               
                        member.getLbClusterId()));
-                                                               }
-                                                       }
-                                               }
-                                       }
-                               }
-
-                // add existing Applications to Topology
-                Collection<Application> applications = 
event.getTopology().getApplications();
-                if (applications != null && !applications.isEmpty()) {
-                    for (Application application : applications) {
-                        topology.addApplication(application);
-                        if (log.isDebugEnabled()) {
-                            log.debug("Application with id [ " +  
application.getId() + " ] added to Topology");
-                        }
-                    }
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("No Application information found in 
Complete Topology event");
-                    }
-                }
 
-                               if (log.isInfoEnabled()) {
-                                       log.info("Topology initialized");
-                               }
+            if (!topology.isInitialized()) {
+                TopologyManager.acquireWriteLock();
+
+                try {
+                    return doProcess(event, topology);
 
-                               // Set topology initialized
-                               topology.setInitialized(true);
-                       }
+                } finally {
+                    TopologyManager.releaseWriteLock();
+                }
+            } else {
+                return true;
+            }
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
         } else {
             if (nextProcessor != null) {
                 // ask the next processor to take care of the message.
@@ -153,4 +72,99 @@ public class CompleteTopologyMessageProcessor extends 
MessageProcessor {
             return false;
         }
     }
+
+    private boolean doProcess (CompleteTopologyEvent event, Topology topology) 
{
+
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            // Add services included in service filter
+            for (Service service : event.getTopology().getServices()) {
+                if (TopologyServiceFilter.getInstance()
+                        .serviceNameIncluded(service.getServiceName())) {
+                    topology.addService(service);
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format(
+                                "Service is excluded: [service] %s",
+                                service.getServiceName()));
+                    }
+                }
+            }
+        } else {
+            // Add all services
+            topology.addServices(event.getTopology().getServices());
+        }
+
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            for (Service service : topology.getServices()) {
+                List<Cluster> clustersToRemove = new ArrayList<Cluster>();
+                for (Cluster cluster : service.getClusters()) {
+                    if (TopologyClusterFilter.getInstance()
+                            .clusterIdExcluded(cluster.getClusterId())) {
+                        clustersToRemove.add(cluster);
+                    }
+                }
+                for (Cluster cluster : clustersToRemove) {
+                    service.removeCluster(cluster);
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format(
+                                "Cluster is excluded: [cluster] %s",
+                                cluster.getClusterId()));
+                    }
+                }
+            }
+        }
+
+        // Apply member filter
+        if (TopologyMemberFilter.getInstance().isActive()) {
+            for (Service service : topology.getServices()) {
+                for (Cluster cluster : service.getClusters()) {
+                    List<Member> membersToRemove = new ArrayList<Member>();
+                    for (Member member : cluster.getMembers()) {
+                        if (TopologyMemberFilter.getInstance()
+                                .lbClusterIdExcluded(
+                                        member.getLbClusterId())) {
+                            membersToRemove.add(member);
+                        }
+                    }
+                    for (Member member : membersToRemove) {
+                        cluster.removeMember(member);
+                        if (log.isDebugEnabled()) {
+                            log.debug(String
+                                    .format("Member is excluded: [member] %s 
[lb-cluster-id] %s",
+                                            member.getMemberId(),
+                                            member.getLbClusterId()));
+                        }
+                    }
+                }
+            }
+        }
+
+        // add existing Applications to Topology
+        Collection<Application> applications = 
event.getTopology().getApplications();
+        if (applications != null && !applications.isEmpty()) {
+            for (Application application : applications) {
+                topology.addApplication(application);
+                if (log.isDebugEnabled()) {
+                    log.debug("Application with id [ " +  application.getId() 
+ " ] added to Topology");
+                }
+            }
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("No Application information found in Complete 
Topology event");
+            }
+        }
+
+        if (log.isInfoEnabled()) {
+            log.info("Topology initialized");
+        }
+
+        // Set topology initialized
+        topology.setInitialized(true);
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
index 627d9a9..7200431 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
@@ -21,11 +21,9 @@ package 
org.apache.stratos.messaging.message.processor.topology;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.topology.ClusterActivatedEvent;
 import org.apache.stratos.messaging.event.topology.GroupActivatedEvent;
-import 
org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
-import 
org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 /**
@@ -53,34 +51,16 @@ public class GroupActivatedProcessor extends 
MessageProcessor {
             GroupActivatedEvent event = (GroupActivatedEvent) Util.
                     jsonToObject(message, GroupActivatedEvent.class);
 
-            // Validate event against the existing topology
-            Application application = 
topology.getApplication(event.getAppId());
-            if (application == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Application does not exist: 
[service] %s",
-                            event.getAppId()));
-                }
-                return false;
-            }
-            Group group = application.getGroup(event.getGroupId());
+            TopologyManager.acquireReadLockForApplications();
+            TopologyManager.acquireWriteLockForApplication(event.getAppId());
 
-            if (group == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Group not exists in service: 
[AppId] %s [groupId] %s", event.getAppId(),
-                            event.getGroupId()));
-                }
-            } else {
-                // Apply changes to the topology
-                group.setStatus(Status.Activated);
-                if (log.isInfoEnabled()) {
-                    log.info(String.format("Group updated as activated : %s",
-                            group.toString()));
-                }
-            }
+            try {
+                return doProcess(event, topology);
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
+            } finally {
+                
TopologyManager.releaseWriteLockForApplication(event.getAppId());
+                TopologyManager.releaseReadLockForApplications();
+            }
 
         } else {
             if (nextProcessor != null) {
@@ -91,4 +71,36 @@ public class GroupActivatedProcessor extends 
MessageProcessor {
             }
         }
     }
+
+    private boolean doProcess (GroupActivatedEvent event,Topology topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] 
%s",
+                        event.getAppId()));
+            }
+            return false;
+        }
+        Group group = application.getGroup(event.getGroupId());
+
+        if (group == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Group not exists in service: [AppId] 
%s [groupId] %s", event.getAppId(),
+                        event.getGroupId()));
+            }
+        } else {
+            // Apply changes to the topology
+            group.setStatus(Status.Activated);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Group updated as activated : %s",
+                        group.toString()));
+            }
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
index 8e4e1b1..2d3f8b3 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
@@ -26,6 +26,7 @@ import 
org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class InstanceSpawnedMessageProcessor extends MessageProcessor {
@@ -50,92 +51,103 @@ public class InstanceSpawnedMessageProcessor extends 
MessageProcessor {
             // Parse complete message and build event
             InstanceSpawnedEvent event = (InstanceSpawnedEvent) 
Util.jsonToObject(message, InstanceSpawnedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
-                    // Service is excluded, do not update topology or fire 
event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), 
event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                
TopologyManager.releaseWriteLockForCluster(event.getServiceName(), 
event.getClusterId());
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire 
event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: 
[cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
             }
+        }
+    }
+
+    private boolean doProcess (InstanceSpawnedEvent event,Topology topology){
 
-            // Apply member filter
-            if(TopologyMemberFilter.getInstance().isActive()) {
-                
if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId()))
 {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member is excluded: 
[lb-cluster-id] %s", event.getLbClusterId()));
-                    }
-                    return false;
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] 
%s", event.getServiceName()));
                 }
+                return false;
             }
+        }
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] 
%s",
-                            event.getServiceName()));
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] 
%s", event.getClusterId()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] 
%s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
+        }
+
+        // Apply member filter
+        if(TopologyMemberFilter.getInstance().isActive()) {
+            
if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId()))
 {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is excluded: 
[lb-cluster-id] %s", event.getLbClusterId()));
                 }
                 return false;
             }
-            if (cluster.memberExists(event.getMemberId())) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already exists: [service] 
%s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-               
-               // Apply changes to the topology
-               Member member = new Member(event.getServiceName(), 
event.getClusterId(), event.getNetworkPartitionId(), event.getPartitionId(), 
event.getMemberId());
-               member.setStatus(MemberStatus.Created);
-               member.setMemberPublicIp(event.getMemberPublicIp());
-               member.setMemberIp(event.getMemberIp());
-               member.setLbClusterId(event.getLbClusterId());
-                member.setProperties(event.getProperties());
-               cluster.addMember(member);
-               
-               if (log.isInfoEnabled()) {
-                       log.info(String.format("Member created: [service] %s 
[cluster] %s [member] %s",
-                                       event.getServiceName(),
-                                       event.getClusterId(),
-                                       event.getMemberId()));
-               }
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
+            }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s 
[cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        if (cluster.memberExists(event.getMemberId())) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already exists: [service] %s 
[cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
+        } else {
 
+            // Apply changes to the topology
+            Member member = new Member(event.getServiceName(), 
event.getClusterId(), event.getNetworkPartitionId(), event.getPartitionId(), 
event.getMemberId());
+            member.setStatus(MemberStatus.Created);
+            member.setMemberPublicIp(event.getMemberPublicIp());
+            member.setMemberIp(event.getMemberIp());
+            member.setLbClusterId(event.getLbClusterId());
+            member.setProperties(event.getProperties());
+            cluster.addMember(member);
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member created: [service] %s [cluster] 
%s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
index a5d701d..ec1b5ec 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
@@ -30,6 +30,7 @@ import 
org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberActivatedMessageProcessor extends MessageProcessor {
@@ -54,111 +55,123 @@ public class MemberActivatedMessageProcessor extends 
MessageProcessor {
             // Parse complete message and build event
             MemberActivatedEvent event = (MemberActivatedEvent) 
Util.jsonToObject(message, MemberActivatedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
-                    // Service is excluded, do not update topology or fire 
event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
-                    }
-                    return false;
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), 
event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                
TopologyManager.releaseWriteLockForCluster(event.getServiceName(), 
event.getClusterId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
+            }
+        }
+    }
+
+    private boolean doProcess (MemberActivatedEvent event,Topology topology) {
+
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] 
%s", event.getServiceName()));
                 }
+                return false;
             }
+        }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire 
event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: 
[cluster] %s", event.getClusterId()));
-                    }
-                    return false;
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] 
%s", event.getClusterId()));
                 }
+                return false;
             }
+        }
 
-            // Validate event properties
-            if ((event.getMemberIp() == null) || 
event.getMemberIp().isEmpty()) {
-                throw new RuntimeException(String.format("No ip address found 
in member activated event: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
+        // Validate event properties
+        if ((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) {
+            throw new RuntimeException(String.format("No ip address found in 
member activated event: [service] %s [cluster] %s [member] %s",
+                    event.getServiceName(),
+                    event.getClusterId(),
+                    event.getMemberId()));
+        }
+        if ((event.getPorts() == null) || (event.getPorts().size() == 0)) {
+            throw new RuntimeException(String.format("No ports found in member 
activated event: [service] %s [cluster] %s [member] %s",
+                    event.getServiceName(),
+                    event.getClusterId(),
+                    event.getMemberId()));
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s", 
event.getServiceName()));
             }
-            if ((event.getPorts() == null) || (event.getPorts().size() == 0)) {
-                throw new RuntimeException(String.format("No ports found in 
member activated event: [service] %s [cluster] %s [member] %s",
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s 
[cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        Member member = cluster.getMember(event.getMemberId());
+        if (member == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member does not exist: [service] %s 
[cluster] %s [member] %s",
                         event.getServiceName(),
                         event.getClusterId(),
                         event.getMemberId()));
             }
+            return false;
+        }
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] 
%s", event.getServiceName()));
-                }
-                return false;
-            }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] 
%s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
-                }
-                return false;
-            }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member does not exist: [service] 
%s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
+        // Apply member filter
+        if(TopologyMemberFilter.getInstance().isActive()) {
+            
if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId()))
 {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is excluded: 
[lb-cluster-id] %s", member.getLbClusterId()));
                 }
                 return false;
             }
+        }
 
-            // Apply member filter
-            if(TopologyMemberFilter.getInstance().isActive()) {
-                
if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId()))
 {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member is excluded: 
[lb-cluster-id] %s", member.getLbClusterId()));
-                    }
-                    return false;
-                }
+        if (member.getStatus() == MemberStatus.Activated) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already activated: [service] %s 
[cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
+        } else {
 
-            if (member.getStatus() == MemberStatus.Activated) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already activated: 
[service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-               
-               // Apply changes to the topology
-               member.addPorts(event.getPorts());
-               member.setMemberIp(event.getMemberIp());
-               member.setStatus(MemberStatus.Activated);
-               
-               if (log.isInfoEnabled()) {
-                       log.info(String.format("Member activated: [service] %s 
[cluster] %s [member] %s",
-                                       event.getServiceName(),
-                                       event.getClusterId(),
-                                       event.getMemberId()));
-               }
-            }
+            // Apply changes to the topology
+            member.addPorts(event.getPorts());
+            member.setMemberIp(event.getMemberIp());
+            member.setStatus(MemberStatus.Activated);
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member activated: [service] %s 
[cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
index b6dc489..b252a61 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
@@ -27,6 +27,7 @@ import 
org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberMaintenanceModeProcessor extends MessageProcessor {
@@ -51,98 +52,110 @@ public class MemberMaintenanceModeProcessor extends 
MessageProcessor {
             MemberMaintenanceModeEvent event = (MemberMaintenanceModeEvent) 
Util.
                                             jsonToObject(message, 
MemberMaintenanceModeEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
-                    // Service is excluded, do not update topology or fire 
event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), 
event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                
TopologyManager.releaseWriteLockForCluster(event.getServiceName(), 
event.getClusterId());
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire 
event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: 
[cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
             }
+        }
+    }
+
+    private boolean doProcess (MemberMaintenanceModeEvent event,Topology 
topology) {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] 
%s",
-                            event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] 
%s", event.getServiceName()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] 
%s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
+        }
+
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] 
%s", event.getClusterId()));
                 }
                 return false;
             }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member does not exist: [service] 
%s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-                return false;
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
             }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s 
[cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        Member member = cluster.getMember(event.getMemberId());
+        if (member == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member does not exist: [service] %s 
[cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+            return false;
+        }
 
-            // Apply member filter
-            if(TopologyMemberFilter.getInstance().isActive()) {
-                
if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId()))
 {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member is excluded: 
[lb-cluster-id] %s", member.getLbClusterId()));
-                    }
-                    return false;
+        // Apply member filter
+        if(TopologyMemberFilter.getInstance().isActive()) {
+            
if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId()))
 {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is excluded: 
[lb-cluster-id] %s", member.getLbClusterId()));
                 }
+                return false;
             }
+        }
 
-            if (member.getStatus() == MemberStatus.In_Maintenance) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already updated as 
In_Maintenance: " +
-                            "[service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-               
-               // Apply changes to the topology
-               member.setStatus(MemberStatus.In_Maintenance);
-               
-               if (log.isInfoEnabled()) {
-                       log.info(String.format("Member updated as 
In_Maintenance: [service] %s [cluster] %s [member] %s",
-                                       event.getServiceName(),
-                                       event.getClusterId(),
-                                       event.getMemberId()));
-               }
+        if (member.getStatus() == MemberStatus.In_Maintenance) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already updated as 
In_Maintenance: " +
+                        "[service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
+        } else {
 
+            // Apply changes to the topology
+            member.setStatus(MemberStatus.In_Maintenance);
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member updated as In_Maintenance: 
[service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
index 92115aa..f0c3580 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
@@ -26,6 +26,7 @@ import 
org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberReadyToShutdownMessageProcessor extends MessageProcessor{
@@ -50,98 +51,111 @@ public class MemberReadyToShutdownMessageProcessor extends 
MessageProcessor{
             MemberReadyToShutdownEvent event = (MemberReadyToShutdownEvent) 
Util.
                                             jsonToObject(message, 
MemberReadyToShutdownEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
-                    // Service is excluded, do not update topology or fire 
event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), 
event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                
TopologyManager.releaseWriteLockForCluster(event.getServiceName(), 
event.getClusterId());
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire 
event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: 
[cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
             }
+        }
+    }
+
+    private boolean doProcess (MemberReadyToShutdownEvent event,Topology 
topology) {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] 
%s",
-                            event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] 
%s", event.getServiceName()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] 
%s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
+        }
+
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] 
%s", event.getClusterId()));
                 }
                 return false;
             }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member does not exist: [service] 
%s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-                return false;
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
             }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s 
[cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        Member member = cluster.getMember(event.getMemberId());
+        if (member == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member does not exist: [service] %s 
[cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+            return false;
+        }
 
-            // Apply member filter
-            if(TopologyMemberFilter.getInstance().isActive()) {
-                
if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId()))
 {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member is excluded: 
[lb-cluster-id] %s", member.getLbClusterId()));
-                    }
-                    return false;
+        // Apply member filter
+        if(TopologyMemberFilter.getInstance().isActive()) {
+            
if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId()))
 {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is excluded: 
[lb-cluster-id] %s", member.getLbClusterId()));
                 }
+                return false;
             }
+        }
 
-            if (member.getStatus() == MemberStatus.ReadyToShutDown) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already updated as Ready to 
Shutdown: " +
-                            "[service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-               
-               // Apply changes to the topology
-               member.setStatus(MemberStatus.ReadyToShutDown);
-               
-               if (log.isInfoEnabled()) {
-                       log.info(String.format("Member updated as Ready to 
shutdown: [service] %s [cluster] %s [member] %s",
-                                       event.getServiceName(),
-                                       event.getClusterId(),
-                                       event.getMemberId()));
-               }
+        if (member.getStatus() == MemberStatus.ReadyToShutDown) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already updated as Ready to 
Shutdown: " +
+                        "[service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
+        } else {
 
+            // Apply changes to the topology
+            member.setStatus(MemberStatus.ReadyToShutDown);
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member updated as Ready to shutdown: 
[service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
index 4d93957..508ec39 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
@@ -30,6 +30,7 @@ import 
org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberStartedMessageProcessor extends MessageProcessor {
@@ -54,97 +55,109 @@ public class MemberStartedMessageProcessor extends 
MessageProcessor {
             // Parse complete message and build event
             MemberStartedEvent event = (MemberStartedEvent) 
Util.jsonToObject(message, MemberStartedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
-                    // Service is excluded, do not update topology or fire 
event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), 
event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                
TopologyManager.releaseWriteLockForCluster(event.getServiceName(), 
event.getClusterId());
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire 
event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: 
[cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
             }
+        }
+    }
+
+    private boolean doProcess (MemberStartedEvent event,Topology topology) {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] 
%s",
-                            event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] 
%s", event.getServiceName()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] 
%s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
+        }
+
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] 
%s", event.getClusterId()));
                 }
                 return false;
             }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member does not exist: [service] 
%s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-                return false;
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
             }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s 
[cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        Member member = cluster.getMember(event.getMemberId());
+        if (member == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member does not exist: [service] %s 
[cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+            return false;
+        }
 
-            // Apply member filter
-            if(TopologyMemberFilter.getInstance().isActive()) {
-                
if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId()))
 {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member is excluded: 
[lb-cluster-id] %s", member.getLbClusterId()));
-                    }
-                    return false;
+        // Apply member filter
+        if(TopologyMemberFilter.getInstance().isActive()) {
+            
if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId()))
 {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is excluded: 
[lb-cluster-id] %s", member.getLbClusterId()));
                 }
+                return false;
             }
+        }
 
-            if (member.getStatus() == MemberStatus.Starting) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already started: [service] 
%s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-               
-               // Apply changes to the topology
-               member.setStatus(MemberStatus.Starting);
-               
-               if (log.isInfoEnabled()) {
-                       log.info(String.format("Member started: [service] %s 
[cluster] %s [member] %s",
-                                       event.getServiceName(),
-                                       event.getClusterId(),
-                                       event.getMemberId()));
-               }
+        if (member.getStatus() == MemberStatus.Starting) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already started: [service] %s 
[cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
+        } else {
 
+            // Apply changes to the topology
+            member.setStatus(MemberStatus.Starting);
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member started: [service] %s [cluster] 
%s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
 }

Reply via email to