initial changes for hierarchical topology locking

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/4ace39c8
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/4ace39c8
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/4ace39c8

Branch: refs/heads/4.0.0-grouping
Commit: 4ace39c85ab1f989e888dcc48afd2c1092ff245f
Parents: 2536b30
Author: Isuru Haththotuwa <[email protected]>
Authored: Tue Oct 7 10:21:48 2014 +0530
Committer: Isuru Haththotuwa <[email protected]>
Committed: Tue Oct 7 19:08:54 2014 +0530

----------------------------------------------------------------------
 .../AutoscalerHealthStatEventReceiver.java      |  18 +-
 .../AutoscalerTopologyEventReceiver.java        | 151 ++++---
 .../controller/topology/TopologyBuilder.java    |   7 +-
 .../LoadBalancerTopologyEventReceiver.java      |  77 +++-
 .../StratosManagerTopologyEventReceiver.java    |  78 +++-
 .../messaging/domain/topology/Service.java      |   9 +-
 .../messaging/domain/topology/Topology.java     |  11 +-
 .../domain/topology/locking/TopologyLock.java   |  49 +++
 .../topology/locking/TopologyLockHierarchy.java | 147 +++++++
 .../ApplicationActivatedMessageProcessor.java   |  54 ++-
 .../ApplicationCreatedMessageProcessor.java     |  65 +--
 .../ApplicationRemovedMessageProcessor.java     |  73 ++--
 .../topology/ClusterActivatedProcessor.java     | 108 +++--
 .../ClusterCreatedMessageProcessor.java         | 131 +++---
 .../ClusterMaintenanceModeMessageProcessor.java | 104 +++--
 .../ClusterRemovedMessageProcessor.java         | 107 +++--
 .../CompleteTopologyMessageProcessor.java       | 200 +++++----
 .../topology/GroupActivatedProcessor.java       |  70 +--
 .../InstanceSpawnedMessageProcessor.java        | 150 ++++---
 .../MemberActivatedMessageProcessor.java        | 183 ++++----
 .../MemberMaintenanceModeProcessor.java         | 159 +++----
 .../MemberReadyToShutdownMessageProcessor.java  | 160 +++----
 .../topology/MemberStartedMessageProcessor.java | 157 +++----
 .../MemberSuspendedMessageProcessor.java        | 155 +++----
 .../MemberTerminatedMessageProcessor.java       | 139 +++---
 .../ServiceCreatedMessageProcessor.java         |  74 ++--
 .../ServiceRemovedMessageProcessor.java         |  71 ++--
 .../topology/TopologyMessageProcessorChain.java |  10 +-
 .../topology/TopologyEventMessageDelegator.java |  10 +-
 .../receiver/topology/TopologyManager.java      | 425 ++++++++++++++++++-
 30 files changed, 2070 insertions(+), 1082 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
index 26d3179..a1213f6 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -593,8 +593,11 @@ public class AutoscalerHealthStatEventReceiver implements 
Runnable {
     }
 
     private Member findMember(String memberId) {
+
+        TopologyManager.acquireReadLockForServices();
+
         try {
-            TopologyManager.acquireReadLock();
+            //TopologyManager.acquireReadLock();
             for(Service service : TopologyManager.getTopology().getServices()) 
{
                 for(Cluster cluster : service.getClusters()) {
                     if(cluster.memberExists(memberId)) {
@@ -605,7 +608,8 @@ public class AutoscalerHealthStatEventReceiver implements 
Runnable {
             return null;
         }
         finally {
-            TopologyManager.releaseReadLock();
+            //TopologyManager.releaseReadLock();
+            TopologyManager.releaseReadLockForServices();
         }
     }
 
@@ -613,8 +617,13 @@ public class AutoscalerHealthStatEventReceiver implements 
Runnable {
         try {
             AbstractClusterMonitor monitor = getMonitor(clusterId);
             NetworkPartitionContext nwPartitionCtxt;
+
+            // TODO: the optimal way would be to add Service Name to member 
fault event and use to get a
+            // hierarchical lock using 
TopologyManager.acquireReadLockForCluster(serviceName, clusterid)
+            TopologyManager.acquireReadLockForServices();
+
             try{
-               TopologyManager.acquireReadLock();
+               //TopologyManager.acquireReadLock();
                Member member = findMember(memberId);
                
                if(null == member){
@@ -634,7 +643,8 @@ public class AutoscalerHealthStatEventReceiver implements 
Runnable {
                 }
 
             }finally{
-               TopologyManager.releaseReadLock();
+               //TopologyManager.releaseReadLock();
+                TopologyManager.releaseReadLockForServices();
             }
             // start a new member in the same Partition
             String partitionId = monitor.getPartitionOfMember(memberId);

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
index 3e1bfe2..4064510 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -90,22 +90,23 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
         topologyEventReceiver.addEventListener(new 
CompleteTopologyEventListener() {
             @Override
             protected void onEvent(Event event) {
-                try {
+
+                if (!topologyInitialized) {
                     TopologyManager.acquireReadLock();
-                    if(!topologyInitialized) {
-                          topologyInitialized = true;
+
+                    try {
                         for (Application application : 
TopologyManager.getTopology().getApplications()) {
                             startApplicationMonitor(application);
                         }
+
+                        topologyInitialized = true;
+                    } catch (Exception e) {
+                        log.error("Error processing event", e);
+                    } finally {
+                        TopologyManager.releaseReadLock();
                     }
-                } catch (Exception e) {
-                    log.error("Error processing event", e);
-                } finally {
-                    TopologyManager.releaseReadLock();
                 }
             }
-
-
         });
 
 
@@ -118,12 +119,19 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
                 ApplicationCreatedEvent applicationCreatedEvent = 
(ApplicationCreatedEvent) event;
 
                 //acquire read lock
-                TopologyManager.acquireReadLock();
-                //start the application monitor
-                //TODO catch exception by ApplicationMonitor
-                
startApplicationMonitor(applicationCreatedEvent.getApplication());
-                //release read lock
-                TopologyManager.releaseReadLock();
+                //TopologyManager.acquireReadLock();
+                
TopologyManager.acquireReadLockForApplication(applicationCreatedEvent.getApplication().getId());
+
+                try {
+                    //start the application monitor
+                    //TODO catch exception by ApplicationMonitor
+                    
startApplicationMonitor(applicationCreatedEvent.getApplication());
+
+                } finally {
+                    //release read lock
+                    
TopologyManager.releaseReadLockForApplication(applicationCreatedEvent.getApplication().getId());
+                    //TopologyManager.releaseReadLock();
+                }
 
             }
         });
@@ -197,7 +205,8 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
                 ApplicationRemovedEvent applicationRemovedEvent = 
(ApplicationRemovedEvent) event;
 
                 //acquire read lock
-                TopologyManager.acquireReadLock();
+                //TopologyManager.acquireReadLock();
+                
TopologyManager.acquireReadLockForApplication(applicationRemovedEvent.getApplicationId());
 
                 try {
                     //TODO remove monitors as well as any starting or pending 
threads
@@ -222,7 +231,8 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
 
                 } finally {
                     //release read lock
-                    TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseReadLockForApplication(applicationRemovedEvent.getApplicationId());
+                    //TopologyManager.releaseReadLock();
                 }
 
             }
@@ -283,26 +293,34 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
         topologyEventReceiver.addEventListener(new 
ClusterMaintenanceModeEventListener() {
             @Override
             protected void onEvent(Event event) {
+
+                ClusterMaintenanceModeEvent clusterMaitenanceEvent = null;
+
                 try {
                     log.info("Event received: " + event);
-                    ClusterMaintenanceModeEvent e = 
(ClusterMaintenanceModeEvent) event;
-                    TopologyManager.acquireReadLock();
-                    Service service = 
TopologyManager.getTopology().getService(e.getServiceName());
-                    Cluster cluster = service.getCluster(e.getClusterId());
+                    clusterMaitenanceEvent = (ClusterMaintenanceModeEvent) 
event;
+                    //TopologyManager.acquireReadLock();
+                    
TopologyManager.acquireWriteLockForCluster(clusterMaitenanceEvent.getServiceName(),
+                            clusterMaitenanceEvent.getClusterId());
+
+                    Service service = 
TopologyManager.getTopology().getService(clusterMaitenanceEvent.getServiceName());
+                    Cluster cluster = 
service.getCluster(clusterMaitenanceEvent.getClusterId());
                     if 
(AutoscalerContext.getInstance().monitorExist((cluster.getClusterId()))) {
-                        
AutoscalerContext.getInstance().getMonitor(e.getClusterId()).
-                                                        
setStatus(e.getStatus());
+                        
AutoscalerContext.getInstance().getMonitor(clusterMaitenanceEvent.getClusterId()).
+                                                        
setStatus(clusterMaitenanceEvent.getStatus());
                     } else if (AutoscalerContext.getInstance().
                                         
lbMonitorExist((cluster.getClusterId()))) {
-                        
AutoscalerContext.getInstance().getLBMonitor(e.getClusterId()).
-                                                        
setStatus(e.getStatus());
+                        
AutoscalerContext.getInstance().getLBMonitor(clusterMaitenanceEvent.getClusterId()).
+                                                        
setStatus(clusterMaitenanceEvent.getStatus());
                     } else {
                         log.error("cluster monitor not exists for the cluster: 
" + cluster.toString());
                     }
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseWriteLockForCluster(clusterMaitenanceEvent.getServiceName(),
+                            clusterMaitenanceEvent.getClusterId());
                 }
             }
 
@@ -312,16 +330,20 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
         topologyEventReceiver.addEventListener(new 
ClusterRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
+
+                ClusterRemovedEvent clusterRemovedEvent = null;
                 try {
-                    ClusterRemovedEvent e = (ClusterRemovedEvent) event;
-                    TopologyManager.acquireReadLock();
+                    clusterRemovedEvent = (ClusterRemovedEvent) event;
+                    //TopologyManager.acquireReadLock();
+                    
TopologyManager.acquireReadLockForCluster(clusterRemovedEvent.getServiceName(),
+                            clusterRemovedEvent.getClusterId());
 
-                    String clusterId = e.getClusterId();
-                    String deploymentPolicy = e.getDeploymentPolicy();
+                    String clusterId = clusterRemovedEvent.getClusterId();
+                    String deploymentPolicy = 
clusterRemovedEvent.getDeploymentPolicy();
 
                     AbstractClusterMonitor monitor;
 
-                    if (e.isLbCluster()) {
+                    if (clusterRemovedEvent.isLbCluster()) {
                         DeploymentPolicy depPolicy = 
PolicyManager.getInstance().
                                                         
getDeploymentPolicy(deploymentPolicy);
                         if (depPolicy != null) {
@@ -362,7 +384,9 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseReadLockForCluster(clusterRemovedEvent.getServiceName(),
+                            clusterRemovedEvent.getClusterId());
                 }
             }
 
@@ -380,14 +404,19 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
             @Override
             protected void onEvent(Event event) {
 
+                MemberTerminatedEvent memberTerminatedEvent =  null;
                 try {
-                    TopologyManager.acquireReadLock();
-                    MemberTerminatedEvent e = (MemberTerminatedEvent) event;
-                    String networkPartitionId = e.getNetworkPartitionId();
-                    String clusterId = e.getClusterId();
-                    String partitionId = e.getPartitionId();
+                    //TopologyManager.acquireReadLock();
+
+                    memberTerminatedEvent = (MemberTerminatedEvent) event;
+                    String networkPartitionId = 
memberTerminatedEvent.getNetworkPartitionId();
+                    String clusterId = memberTerminatedEvent.getClusterId();
+                    String partitionId = 
memberTerminatedEvent.getPartitionId();
                     AbstractClusterMonitor monitor;
 
+                    
TopologyManager.acquireReadLockForCluster(memberTerminatedEvent.getServiceName(),
+                            memberTerminatedEvent.getClusterId());
+
                     if 
(AutoscalerContext.getInstance().monitorExist(clusterId)) {
                         monitor = 
AutoscalerContext.getInstance().getMonitor(clusterId);
                     } else {
@@ -400,7 +429,7 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
 
                     PartitionContext partitionContext = 
networkPartitionContext.
                                                                     
getPartitionCtxt(partitionId);
-                    String memberId = e.getMemberId();
+                    String memberId = memberTerminatedEvent.getMemberId();
                     partitionContext.removeMemberStatsContext(memberId);
 
                     if 
(partitionContext.removeTerminationPendingMember(memberId)) {
@@ -431,7 +460,9 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseReadLockForCluster(memberTerminatedEvent.getServiceName(),
+                            memberTerminatedEvent.getClusterId());
                 }
             }
 
@@ -441,14 +472,18 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
             @Override
             protected void onEvent(Event event) {
 
+                MemberActivatedEvent memberActivatedEvent = 
(MemberActivatedEvent) event;
+
+                //TopologyManager.acquireReadLock();
+                
TopologyManager.acquireReadLockForCluster(memberActivatedEvent.getServiceName(),
+                        memberActivatedEvent.getClusterId());
+
                 try {
-                    TopologyManager.acquireReadLock();
 
-                    MemberActivatedEvent e = (MemberActivatedEvent) event;
-                    String networkPartitionId = e.getNetworkPartitionId();
-                    String clusterId = e.getClusterId();
-                    String partitionId = e.getPartitionId();
-                    String memberId = e.getMemberId();
+                    String networkPartitionId = 
memberActivatedEvent.getNetworkPartitionId();
+                    String clusterId = memberActivatedEvent.getClusterId();
+                    String partitionId = memberActivatedEvent.getPartitionId();
+                    String memberId = memberActivatedEvent.getMemberId();
 
                     AbstractClusterMonitor monitor;
 
@@ -476,12 +511,14 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
 //                partitionContext.incrementCurrentActiveMemberCount(1);
                     
partitionContext.movePendingMemberToActiveMembers(memberId);
                     //triggering the status checker
-                    
StatusChecker.getInstance().onMemberStatusChange(e.getClusterId());
+                    
StatusChecker.getInstance().onMemberStatusChange(memberActivatedEvent.getClusterId());
 
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseReadLockForCluster(memberActivatedEvent.getServiceName(),
+                            memberActivatedEvent.getClusterId());
                 }
             }
         });
@@ -490,16 +527,20 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
             @Override
             protected void onEvent(Event event) {
 
+                MemberMaintenanceModeEvent memberMaintenanceModeEvent = 
(MemberMaintenanceModeEvent) event;
+
+                //TopologyManager.acquireReadLock();
+                
TopologyManager.acquireReadLockForCluster(memberMaintenanceModeEvent.getServiceName(),
+                        memberMaintenanceModeEvent.getClusterId());
+
                 try {
-                    TopologyManager.acquireReadLock();
 
-                    MemberMaintenanceModeEvent e = 
(MemberMaintenanceModeEvent) event;
-                    String memberId = e.getMemberId();
-                    String partitionId = e.getPartitionId();
-                    String networkPartitionId = e.getNetworkPartitionId();
+                    String memberId = memberMaintenanceModeEvent.getMemberId();
+                    String partitionId = 
memberMaintenanceModeEvent.getPartitionId();
+                    String networkPartitionId = 
memberMaintenanceModeEvent.getNetworkPartitionId();
 
                     PartitionContext partitionContext;
-                    String clusterId = e.getClusterId();
+                    String clusterId = 
memberMaintenanceModeEvent.getClusterId();
                     AbstractClusterMonitor monitor;
 
                     if 
(AutoscalerContext.getInstance().monitorExist(clusterId)) {
@@ -521,7 +562,9 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseReadLockForCluster(memberMaintenanceModeEvent.getServiceName(),
+                            memberMaintenanceModeEvent.getClusterId());
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
index 6d50f6c..30b1b00 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
@@ -608,7 +608,7 @@ public class TopologyBuilder {
         }
     }
 
-    public static void handleApplicationDeployed(Application application,
+    public static synchronized void handleApplicationDeployed(Application 
application,
                                                  
Set<ApplicationClusterContext> applicationClusterContexts,
                                                  Set<MetaDataHolder> 
metaDataHolders) {
 
@@ -644,6 +644,7 @@ public class TopologyBuilder {
             // add to Topology and update
             topology.addApplication(application);
             TopologyManager.updateTopology(topology);
+
             log.info("Application with id [ " + application.getId() + " ] 
added to Topology successfully");
 
             TopologyEventPublisher.sendApplicationCreatedEvent(application 
,clusters);
@@ -653,7 +654,8 @@ public class TopologyBuilder {
         }
     }
 
-    public static void handleApplicationUndeployed(FasterLookUpDataHolder 
dataHolder, String applicationId, int tenantId, String tenantDomain) {
+    public static synchronized void 
handleApplicationUndeployed(FasterLookUpDataHolder dataHolder,
+                                                                String 
applicationId, int tenantId, String tenantDomain) {
 
         Topology topology = TopologyManager.getTopology();
 
@@ -672,6 +674,7 @@ public class TopologyBuilder {
                     if (service != null) {
                         // remove Cluster
                         
service.removeCluster(clusterDataHolder.getClusterId());
+
                         if (log.isDebugEnabled()) {
                             log.debug("Removed cluster with id " + 
clusterDataHolder.getClusterId());
                         }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
index a5b5e42..4222075 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
@@ -118,10 +118,15 @@ public class LoadBalancerTopologyEventReceiver implements 
Runnable {
         topologyEventReceiver.addEventListener(new 
MemberActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {
+
+                MemberActivatedEvent memberActivatedEvent = 
(MemberActivatedEvent) event;
+
+                //TopologyManager.acquireReadLock();
+                
TopologyManager.acquireWriteLockForCluster(memberActivatedEvent.getServiceName(),
+                        memberActivatedEvent.getClusterId());
+
                 try {
-                    TopologyManager.acquireReadLock();
 
-                    MemberActivatedEvent memberActivatedEvent = 
(MemberActivatedEvent) event;
                     Service service = 
TopologyManager.getTopology().getService(memberActivatedEvent.getServiceName());
                     if (service == null) {
                         if (log.isWarnEnabled()) {
@@ -167,16 +172,24 @@ public class LoadBalancerTopologyEventReceiver implements 
Runnable {
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseWriteLockForCluster(memberActivatedEvent.getServiceName(),
+                            memberActivatedEvent.getClusterId());
                 }
             }
         });
         topologyEventReceiver.addEventListener(new MemberMaintenanceListener() 
{
             @Override
             protected void onEvent(Event event) {
+
+                MemberMaintenanceModeEvent memberMaintenanceModeEvent = 
(MemberMaintenanceModeEvent) event;
+
+                
TopologyManager.acquireWriteLockForCluster(memberMaintenanceModeEvent.getServiceName(),
+                        memberMaintenanceModeEvent.getClusterId());
+
                 try {
-                    TopologyManager.acquireReadLock();
-                    MemberMaintenanceModeEvent memberMaintenanceModeEvent = 
(MemberMaintenanceModeEvent) event;
+                    //TopologyManager.acquireReadLock();
+
                     Member member = 
findMember(memberMaintenanceModeEvent.getServiceName(),
                             memberMaintenanceModeEvent.getClusterId(), 
memberMaintenanceModeEvent.getMemberId());
 
@@ -186,16 +199,22 @@ public class LoadBalancerTopologyEventReceiver implements 
Runnable {
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseWriteLockForCluster(memberMaintenanceModeEvent.getServiceName(),
+                            memberMaintenanceModeEvent.getClusterId());
                 }
             }
         });
         topologyEventReceiver.addEventListener(new 
MemberSuspendedEventListener() {
             @Override
             protected void onEvent(Event event) {
+
+                MemberSuspendedEvent memberSuspendedEvent = 
(MemberSuspendedEvent) event;
+                
TopologyManager.acquireWriteLockForCluster(memberSuspendedEvent.getServiceName(),
+                        memberSuspendedEvent.getClusterId());
+
                 try {
-                    TopologyManager.acquireReadLock();
-                    MemberSuspendedEvent memberSuspendedEvent = 
(MemberSuspendedEvent) event;
+                    //TopologyManager.acquireReadLock();
                     Member member = 
findMember(memberSuspendedEvent.getServiceName(),
                             memberSuspendedEvent.getClusterId(), 
memberSuspendedEvent.getMemberId());
 
@@ -205,16 +224,23 @@ public class LoadBalancerTopologyEventReceiver implements 
Runnable {
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseWriteLockForCluster(memberSuspendedEvent.getServiceName(),
+                            memberSuspendedEvent.getClusterId());
                 }
             }
         });
         topologyEventReceiver.addEventListener(new 
MemberTerminatedEventListener() {
             @Override
             protected void onEvent(Event event) {
+
+                //TopologyManager.acquireReadLock();
+                MemberTerminatedEvent memberTerminatedEvent = 
(MemberTerminatedEvent) event;
+
+                
TopologyManager.acquireWriteLockForCluster(memberTerminatedEvent.getServiceName(),
+                        memberTerminatedEvent.getClusterId());
+
                 try {
-                    TopologyManager.acquireReadLock();
-                    MemberTerminatedEvent memberTerminatedEvent = 
(MemberTerminatedEvent) event;
                     Member member = 
findMember(memberTerminatedEvent.getServiceName(),
                             memberTerminatedEvent.getClusterId(), 
memberTerminatedEvent.getMemberId());
 
@@ -224,18 +250,23 @@ public class LoadBalancerTopologyEventReceiver implements 
Runnable {
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseWriteLockForCluster(memberTerminatedEvent.getServiceName(),
+                            memberTerminatedEvent.getClusterId());
                 }
             }
         });
         topologyEventReceiver.addEventListener(new 
ClusterRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
-                try {
-                    TopologyManager.acquireReadLock();
 
-                    // Remove cluster from context
-                    ClusterRemovedEvent clusterRemovedEvent = 
(ClusterRemovedEvent) event;
+                // Remove cluster from context
+                ClusterRemovedEvent clusterRemovedEvent = 
(ClusterRemovedEvent) event;
+                
TopologyManager.acquireWriteLockForCluster(clusterRemovedEvent.getServiceName(),
+                        clusterRemovedEvent.getClusterId());
+
+                try {
+                    //TopologyManager.acquireReadLock();
                     Cluster cluster = 
LoadBalancerContext.getInstance().getClusterIdClusterMap().getCluster(clusterRemovedEvent.getClusterId());
                     if (cluster != null) {
                         for (Member member : cluster.getMembers()) {
@@ -251,18 +282,23 @@ public class LoadBalancerTopologyEventReceiver implements 
Runnable {
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseWriteLockForCluster(clusterRemovedEvent.getServiceName(),
+                            clusterRemovedEvent.getClusterId());
                 }
             }
         });
         topologyEventReceiver.addEventListener(new 
ServiceRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
+
+                ServiceRemovedEvent serviceRemovedEvent = 
(ServiceRemovedEvent) event;
+                TopologyManager.acquireWriteLockForServices();
+
                 try {
-                    TopologyManager.acquireReadLock();
+                    //TopologyManager.acquireReadLock();
 
                     // Remove all clusters of given service from context
-                    ServiceRemovedEvent serviceRemovedEvent = 
(ServiceRemovedEvent) event;
                     Service service = 
TopologyManager.getTopology().getService(serviceRemovedEvent.getServiceName());
                     if (service != null) {
                         for (Cluster cluster : service.getClusters()) {
@@ -280,7 +316,8 @@ public class LoadBalancerTopologyEventReceiver implements 
Runnable {
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseWriteLockForServices();
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
index c3a5719..d61f474 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
@@ -23,8 +23,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.manager.exception.ADCException;
 import org.apache.stratos.manager.exception.ApplicationSubscriptionException;
-import 
org.apache.stratos.manager.exception.CompositeApplicationDefinitionException;
-import org.apache.stratos.manager.exception.CompositeApplicationException;
 import org.apache.stratos.manager.manager.CartridgeSubscriptionManager;
 import org.apache.stratos.manager.subscription.ApplicationSubscription;
 import 
org.apache.stratos.manager.topology.model.TopologyClusterInformationModel;
@@ -95,7 +93,9 @@ public class StratosManagerTopologyEventReceiver implements 
Runnable {
 
                 String serviceType = clustercreatedEvent.getServiceName();
                 //acquire read lock
-                TopologyManager.acquireReadLock();
+                //TopologyManager.acquireReadLock();
+                
TopologyManager.acquireReadLockForCluster(clustercreatedEvent.getServiceName(),
+                        clustercreatedEvent.getClusterId());
 
                 try {
                     Cluster cluster = 
TopologyManager.getTopology().getService(serviceType).getCluster(clustercreatedEvent.getClusterId());
@@ -103,7 +103,9 @@ public class StratosManagerTopologyEventReceiver implements 
Runnable {
 
                 } finally {
                     //release read lock
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseReadLockForCluster(clustercreatedEvent.getServiceName(),
+                            clustercreatedEvent.getClusterId());
                 }
 
             }
@@ -137,14 +139,18 @@ public class StratosManagerTopologyEventReceiver 
implements Runnable {
 
                 String serviceType = instanceSpawnedEvent.getServiceName();
                 //acquire read lock
-                TopologyManager.acquireReadLock();
+                //TopologyManager.acquireReadLock();
+                
TopologyManager.acquireReadLockForCluster(instanceSpawnedEvent.getServiceName(),
+                        instanceSpawnedEvent.getClusterId());
 
                 try {
                     Cluster cluster = 
TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
                     
TopologyClusterInformationModel.getInstance().addCluster(cluster);
                 } finally {
                     //release read lock
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseReadLockForCluster(instanceSpawnedEvent.getServiceName(),
+                            instanceSpawnedEvent.getClusterId());
                 }
             }
         });
@@ -162,14 +168,18 @@ public class StratosManagerTopologyEventReceiver 
implements Runnable {
 
                 String serviceType = memberStartedEvent.getServiceName();
                 //acquire read lock
-                TopologyManager.acquireReadLock();
+                //TopologyManager.acquireReadLock();
+                
TopologyManager.acquireReadLockForCluster(memberStartedEvent.getServiceName(),
+                        memberStartedEvent.getClusterId());
 
                 try {
                     Cluster cluster = 
TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
                     
TopologyClusterInformationModel.getInstance().addCluster(cluster);
                 } finally {
                     //release read lock
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseReadLockForCluster(memberStartedEvent.getServiceName(),
+                            memberStartedEvent.getClusterId());
                 }
 
             }
@@ -188,14 +198,18 @@ public class StratosManagerTopologyEventReceiver 
implements Runnable {
 
                 String serviceType = memberActivatedEvent.getServiceName();
                 //acquire read lock
-                TopologyManager.acquireReadLock();
+                //TopologyManager.acquireReadLock();
+                
TopologyManager.acquireReadLockForCluster(memberActivatedEvent.getServiceName(),
+                        memberActivatedEvent.getClusterId());
 
                 try {
                     Cluster cluster = 
TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
                     
TopologyClusterInformationModel.getInstance().addCluster(cluster);
                 } finally {
                     //release read lock
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseReadLockForCluster(memberActivatedEvent.getServiceName(),
+                            memberActivatedEvent.getClusterId());
                 }
             }
         });
@@ -213,7 +227,9 @@ public class StratosManagerTopologyEventReceiver implements 
Runnable {
 
                 String serviceType = memberSuspendedEvent.getServiceName();
                 //acquire read lock
-                TopologyManager.acquireReadLock();
+                //TopologyManager.acquireReadLock();
+                
TopologyManager.acquireReadLockForCluster(memberSuspendedEvent.getServiceName(),
+                        memberSuspendedEvent.getClusterId());
 
                 try {
                     Cluster cluster = 
TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
@@ -221,7 +237,9 @@ public class StratosManagerTopologyEventReceiver implements 
Runnable {
 
                 } finally {
                     //release read lock
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseReadLockForCluster(memberSuspendedEvent.getServiceName(),
+                            memberSuspendedEvent.getClusterId());
                 }
             }
         });
@@ -239,7 +257,9 @@ public class StratosManagerTopologyEventReceiver implements 
Runnable {
 
                 String serviceType = memberTerminatedEvent.getServiceName();
                 //acquire read lock
-                TopologyManager.acquireReadLock();
+                //TopologyManager.acquireReadLock();
+                
TopologyManager.acquireReadLockForCluster(memberTerminatedEvent.getServiceName(),
+                        memberTerminatedEvent.getClusterId());
 
                 try {
                     Cluster cluster = 
TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
@@ -247,8 +267,12 @@ public class StratosManagerTopologyEventReceiver 
implements Runnable {
                     // check and remove terminated member
                     if 
(cluster.memberExists(memberTerminatedEvent.getMemberId())) {
                         // release the read lock and acquire the write lock
-                        TopologyManager.releaseReadLock();
-                        TopologyManager.acquireWriteLock();
+//                        TopologyManager.releaseReadLock();
+//                        TopologyManager.acquireWriteLock();
+                        
TopologyManager.releaseReadLockForCluster(memberTerminatedEvent.getServiceName(),
+                                memberTerminatedEvent.getClusterId());
+                        
TopologyManager.acquireWriteLockForCluster(memberTerminatedEvent.getServiceName(),
+                                memberTerminatedEvent.getClusterId());
 
                         try {
                             // re-check the state; another thread might have 
acquired the write lock and modified
@@ -263,17 +287,23 @@ public class StratosManagerTopologyEventReceiver 
implements Runnable {
 
                             // downgrade to read lock - 1. acquire read lock, 
2. release write lock
                             // acquire read lock
-                            TopologyManager.acquireReadLock();
+                            //TopologyManager.acquireReadLock();
+                            
TopologyManager.acquireReadLockForCluster(memberTerminatedEvent.getServiceName(),
+                                    memberTerminatedEvent.getClusterId());
 
                         } finally {
                             // release the write lock
-                            TopologyManager.releaseWriteLock();
+                           // TopologyManager.releaseWriteLock();
+                            
TopologyManager.releaseWriteLockForCluster(memberTerminatedEvent.getServiceName(),
+                                    memberTerminatedEvent.getClusterId());
                         }
                     }
                     
TopologyClusterInformationModel.getInstance().addCluster(cluster);
                 } finally {
                     //release read lock
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseReadLockForCluster(memberTerminatedEvent.getServiceName(),
+                            memberTerminatedEvent.getClusterId());
                 }
             }
         });
@@ -288,7 +318,8 @@ public class StratosManagerTopologyEventReceiver implements 
Runnable {
                 log.info("[ApplicationCreatedEventListener] Received: " + 
event.getClass());
 
                 try {
-                    TopologyManager.acquireReadLock();
+                    //TopologyManager.acquireReadLock();
+                    
TopologyManager.acquireReadLockForApplication(appCreateEvent.getApplication().getId());
                     
                     // create and persist Application subscritpion
                     CartridgeSubscriptionManager cartridgeSubscriptionManager 
= new CartridgeSubscriptionManager();
@@ -318,7 +349,8 @@ public class StratosManagerTopologyEventReceiver implements 
Runnable {
                        PrivilegedCarbonContext.endTenantFlow();
                     }
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseReadLockForApplication(appCreateEvent.getApplication().getId());
                 }
             }
         });
@@ -333,7 +365,8 @@ public class StratosManagerTopologyEventReceiver implements 
Runnable {
                 log.info("[ApplicationRemovedEventListener] Received: " + 
event.getClass());
 
                 try {
-                    TopologyManager.acquireReadLock();
+                    //TopologyManager.acquireReadLock();
+                    
TopologyManager.acquireReadLockForApplication(appRemovedEvent.getApplicationId());
                     
                     // create and persist Application subscritpion
                     CartridgeSubscriptionManager cartridgeSubscriptionManager 
= new CartridgeSubscriptionManager();
@@ -360,7 +393,8 @@ public class StratosManagerTopologyEventReceiver implements 
Runnable {
                        PrivilegedCarbonContext.endTenantFlow();
                     }
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    
TopologyManager.releaseReadLockForApplication(appRemovedEvent.getApplicationId());
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
index 46d46d4..d03cfda 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
@@ -19,6 +19,9 @@
 
 package org.apache.stratos.messaging.domain.topology;
 
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLock;
+import 
org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy;
+
 import java.io.Serializable;
 import java.util.*;
 
@@ -59,14 +62,18 @@ public class Service implements Serializable{
 
     public void addCluster(Cluster cluster) {
         this.clusterIdClusterMap.put(cluster.getClusterId(), cluster);
+        
TopologyLockHierarchy.getInstance().addClusterLock(cluster.getClusterId(), new 
TopologyLock());
     }
 
     public void removeCluster(Cluster cluster) {
         this.clusterIdClusterMap.remove(cluster.getClusterId());
+        
TopologyLockHierarchy.getInstance().removeTopologyLockForCluster(cluster.getClusterId());
     }
 
     public Cluster removeCluster(String clusterId) {
-        return this.clusterIdClusterMap.remove(clusterId);
+        Cluster removedCluster = this.clusterIdClusterMap.remove(clusterId);
+        
TopologyLockHierarchy.getInstance().removeTopologyLockForCluster(clusterId);
+        return removedCluster;
     }
 
     public boolean clusterExists(String clusterId) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
index f8b535f..dabf611 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
@@ -26,8 +26,8 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import 
org.apache.stratos.messaging.domain.topology.util.CompositeApplicationBuilder;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLock;
+import 
org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy;
 
 /**
  * Defines a topology of serviceMap in Stratos.
@@ -54,6 +54,7 @@ public class Topology implements Serializable {
 
     public void addApplication (Application application) {
         this.applicationMap.put(application.getId(), application);
+        
TopologyLockHierarchy.getInstance().addApplicationLock(application.getId(), new 
TopologyLock());
     }
 
     public Application getApplication (String applicationId) {
@@ -62,6 +63,7 @@ public class Topology implements Serializable {
 
     public void removeApplication (String applicationId) {
         applicationMap.remove(applicationId);
+        
TopologyLockHierarchy.getInstance().removeTopologyLockForApplication(applicationId);
     }
 
     public Collection<Application> getApplications () {
@@ -78,9 +80,10 @@ public class Topology implements Serializable {
 
     public void addService(Service service) {
         this.serviceMap.put(service.getServiceName(), service);
+        
TopologyLockHierarchy.getInstance().addServiceLock(service.getServiceName(), 
new TopologyLock());
     }
 
-    public void addServices(Collection<Service> services) {
+    public synchronized void addServices(Collection<Service> services) {
         for (Service service : services) {
             addService(service);
         }
@@ -88,10 +91,12 @@ public class Topology implements Serializable {
 
     public void removeService(Service service) {
         this.serviceMap.remove(service.getServiceName());
+        
TopologyLockHierarchy.getInstance().removeTopologyLockForService(service.getServiceName());
     }
 
     public void removeService(String serviceName) {
         this.serviceMap.remove(serviceName);
+        
TopologyLockHierarchy.getInstance().removeTopologyLockForService(serviceName);
     }
 
     public Service getService(String serviceName) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLock.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLock.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLock.java
new file mode 100644
index 0000000..e1b90ad
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLock.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.domain.topology.locking;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class TopologyLock {
+
+    private final ReentrantReadWriteLock lock;
+
+    public TopologyLock () {
+        lock = new ReentrantReadWriteLock(true);
+    }
+
+    public void acquireWriteLock() {
+        lock.writeLock().lock();
+    }
+
+    public void releaseWritelock() {
+        if (lock.isWriteLockedByCurrentThread()) {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public void acquireReadLock() {
+        lock.readLock().lock();
+    }
+
+    public void releaseReadLock() {
+        lock.readLock().unlock();
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java
new file mode 100644
index 0000000..aa0f7a1
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.domain.topology.locking;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TopologyLockHierarchy {
+
+    private static final Log log = 
LogFactory.getLog(TopologyLockHierarchy.class);
+
+    // lock for Services
+    private TopologyLock serviceLock;
+
+    // lock for Applications
+    private TopologyLock applicatioLock;
+
+    // key = Service.name
+    private Map<String, TopologyLock> serviceNameToTopologyLockMap;
+
+    // key = Application.id
+    private Map<String, TopologyLock> applicationIdToTopologyLockMap;
+
+    // key = Cluster.id
+    private Map<String, TopologyLock> clusterIdToTopologyLockMap;
+
+    private static volatile TopologyLockHierarchy topologyLockHierarchy;
+
+    private TopologyLockHierarchy () {
+
+        this.serviceLock = new TopologyLock();
+        this.applicatioLock = new TopologyLock();
+        this.serviceNameToTopologyLockMap = new ConcurrentHashMap<String, 
TopologyLock>();
+        this.applicationIdToTopologyLockMap = new ConcurrentHashMap<String, 
TopologyLock>();
+        this.clusterIdToTopologyLockMap = new ConcurrentHashMap<String, 
TopologyLock>();
+    }
+
+    public static TopologyLockHierarchy getInstance () {
+
+        if (topologyLockHierarchy == null) {
+            synchronized (TopologyLockHierarchy.class) {
+                if (topologyLockHierarchy == null) {
+                    topologyLockHierarchy = new TopologyLockHierarchy();
+                }
+            }
+        }
+
+        return topologyLockHierarchy;
+    }
+
+    public void addApplicationLock (String appId, final TopologyLock 
topologyLock) {
+
+        if (!applicationIdToTopologyLockMap.containsKey(appId)) {
+            synchronized (applicationIdToTopologyLockMap) {
+                if (!applicationIdToTopologyLockMap.containsKey(appId)) {
+                    applicationIdToTopologyLockMap.put(appId, topologyLock);
+                    log.info("Added lock for Application " + appId);
+                }
+            }
+        } else {
+            log.warn("Topology Lock for Application " + appId + " already 
exists");
+        }
+    }
+
+    public TopologyLock getTopologyLockForApplication (String appId) {
+        return applicationIdToTopologyLockMap.get(appId);
+    }
+
+    public void addServiceLock (String serviceName, final TopologyLock 
topologyLock) {
+
+        if (!serviceNameToTopologyLockMap.containsKey(serviceName)) {
+            synchronized (serviceNameToTopologyLockMap) {
+                if (!serviceNameToTopologyLockMap.containsKey(serviceName)) {
+                    serviceNameToTopologyLockMap.put(serviceName, 
topologyLock);
+                    log.info("Added lock for Service " + serviceName);
+                }
+            }
+        } else {
+            log.warn("Topology Lock for Service " + serviceName + " already 
exists");
+        }
+    }
+
+    public TopologyLock getTopologyLockForService (String serviceName) {
+        return serviceNameToTopologyLockMap.get(serviceName);
+    }
+
+    public void addClusterLock (String clusterId, final TopologyLock 
topologyLock) {
+
+        if (!clusterIdToTopologyLockMap.containsKey(clusterId)) {
+            synchronized (clusterIdToTopologyLockMap) {
+                if (!clusterIdToTopologyLockMap.containsKey(clusterId)) {
+                    clusterIdToTopologyLockMap.put(clusterId, topologyLock);
+                    log.info("Added lock for Cluster " + clusterId);
+                }
+            }
+        } else {
+            log.warn("Topology Lock for Cluster " + clusterId + " already 
exists");
+        }
+    }
+
+    public TopologyLock getTopologyLockForCluster (String clusterId) {
+        return clusterIdToTopologyLockMap.get(clusterId);
+    }
+
+    public void removeTopologyLockForApplication (String appId) {
+        applicationIdToTopologyLockMap.remove(appId);
+        log.info("Removed lock for Application " + appId);
+    }
+
+    public void removeTopologyLockForService (String serviceName) {
+        serviceNameToTopologyLockMap.remove(serviceName);
+        log.info("Removed lock for Service " + serviceName);
+    }
+
+    public void removeTopologyLockForCluster (String clusterId) {
+        clusterIdToTopologyLockMap.remove(clusterId);
+        log.info("Removed lock for Cluster " + clusterId);
+    }
+
+    public TopologyLock getServiceLock() {
+        return serviceLock;
+    }
+
+    public TopologyLock getApplicatioLock() {
+        return applicatioLock;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
index 8c1e66b..803a871 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Status;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ApplicationActivatedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 /**
@@ -56,26 +57,16 @@ public class ApplicationActivatedMessageProcessor extends 
MessageProcessor {
             ApplicationActivatedEvent event = (ApplicationActivatedEvent) Util.
                     jsonToObject(message, ApplicationActivatedEvent.class);
 
-            // Validate event against the existing topology
-            Application application = 
topology.getApplication(event.getAppId());
-            if (application == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Application does not exist: 
[service] %s",
-                            event.getAppId()));
-                }
-                return false;
-            } else {
-                // Apply changes to the topology
-                application.setStatus(Status.Activated);
-                if (log.isInfoEnabled()) {
-                    log.info(String.format("Application updated as activated : 
%s",
-                            application.toString()));
-                }
-            }
+            TopologyManager.acquireReadLockForApplications();
+            TopologyManager.acquireWriteLockForApplication(event.getAppId());
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                
TopologyManager.releaseWriteLockForApplication(event.getAppId());
+                TopologyManager.releaseReadLockForApplications();
+            }
 
         } else {
             if (nextProcessor != null) {
@@ -86,4 +77,29 @@ public class ApplicationActivatedMessageProcessor extends 
MessageProcessor {
             }
         }
     }
+
+    private boolean doProcess (ApplicationActivatedEvent event, Topology 
topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] 
%s",
+                        event.getAppId()));
+            }
+            return false;
+        } else {
+            // Apply changes to the topology
+            application.setStatus(Status.Activated);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Application updated as activated : %s",
+                        application.toString()));
+            }
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
index f02e4be..4368bd7 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class ApplicationCreatedMessageProcessor extends MessageProcessor {
@@ -47,40 +48,21 @@ public class ApplicationCreatedMessageProcessor extends 
MessageProcessor {
                 return false;
             }
 
-            ApplicationCreatedEvent appCreatedEvent = 
(ApplicationCreatedEvent) Util.jsonToObject(message, 
ApplicationCreatedEvent.class);
-            if (appCreatedEvent == null) {
+            ApplicationCreatedEvent event = (ApplicationCreatedEvent) 
Util.jsonToObject(message, ApplicationCreatedEvent.class);
+            if (event == null) {
                 log.error("Unable to convert the JSON message to 
ApplicationCreatedEvent");
                 return false;
             }
 
-            // check if required properties are available
-            if (appCreatedEvent.getApplication() == null) {
-                String errorMsg = "Application object of application created 
event is invalid";
-                log.error(errorMsg);
-                throw new RuntimeException(errorMsg);
-            }
-
-            if (appCreatedEvent.getApplication().getId() == null || 
appCreatedEvent.getApplication().getId().isEmpty()) {
-                String errorMsg = "App id of application created event is 
invalid: [ " + appCreatedEvent.getApplication().getId() + " ]";
-                log.error(errorMsg);
-                throw new RuntimeException(errorMsg);
-            }
+            TopologyManager.acquireWriteLockForApplications();
 
-            // check if an Application with same name exists in topology
-            if 
(topology.applicationExists(appCreatedEvent.getApplication().getId())) {
-                log.warn("Application with id [ " + 
appCreatedEvent.getApplication().getId() + " ] already exists in Topology");
+            try {
+                return doProcess(event, topology);
 
-            } else {
-                // add application and the clusters to Topology
-                for(Cluster cluster: appCreatedEvent.getClusterList()) {
-                    
topology.getService(cluster.getServiceName()).addCluster(cluster);
-                }
-                topology.addApplication(appCreatedEvent.getApplication());
+            } finally {
+                TopologyManager.releaseWriteLockForApplications();
             }
 
-            notifyEventListeners(appCreatedEvent);
-            return true;
-
         } else {
             if (nextProcessor != null) {
                 // ask the next processor to take care of the message.
@@ -90,4 +72,35 @@ public class ApplicationCreatedMessageProcessor extends 
MessageProcessor {
             }
         }
     }
+
+    private boolean doProcess (ApplicationCreatedEvent event,Topology 
topology) {
+
+        // check if required properties are available
+        if (event.getApplication() == null) {
+            String errorMsg = "Application object of application created event 
is invalid";
+            log.error(errorMsg);
+            throw new RuntimeException(errorMsg);
+        }
+
+        if (event.getApplication().getId() == null || 
event.getApplication().getId().isEmpty()) {
+            String errorMsg = "App id of application created event is invalid: 
[ " + event.getApplication().getId() + " ]";
+            log.error(errorMsg);
+            throw new RuntimeException(errorMsg);
+        }
+
+        // check if an Application with same name exists in topology
+        if (topology.applicationExists(event.getApplication().getId())) {
+            log.warn("Application with id [ " + event.getApplication().getId() 
+ " ] already exists in Topology");
+
+        } else {
+            // add application and the clusters to Topology
+            for(Cluster cluster: event.getClusterList()) {
+                
topology.getService(cluster.getServiceName()).addCluster(cluster);
+            }
+            topology.addApplication(event.getApplication());
+        }
+
+        notifyEventListeners(event);
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
index c9dbb07..ed6c2d4 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
@@ -22,9 +22,9 @@ package 
org.apache.stratos.messaging.message.processor.topology;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent;
 import org.apache.stratos.messaging.event.topology.ApplicationRemovedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class ApplicationRemovedMessageProcessor extends MessageProcessor {
@@ -55,38 +55,20 @@ public class ApplicationRemovedMessageProcessor extends 
MessageProcessor {
                    return false;
                }
        
-               ApplicationRemovedEvent appRemovedEvent = 
(ApplicationRemovedEvent) Util.jsonToObject(message, 
ApplicationRemovedEvent.class);
-               if (appRemovedEvent == null) {
+               ApplicationRemovedEvent event = (ApplicationRemovedEvent) 
Util.jsonToObject(message, ApplicationRemovedEvent.class);
+               if (event == null) {
                    log.error("Unable to convert the JSON message to 
ApplicationCreatedEvent");
                    return false;
                }
-               
-            // check if required properties are available
-               if (appRemovedEvent.getApplicationId() == null) {
-                   String errorMsg = "Application Id of application removed 
event is invalid";
-                   log.error(errorMsg);
-                   throw new RuntimeException(errorMsg);
-               }
-               
-               if (appRemovedEvent.getTenantDomain()== null) {
-                   String errorMsg = "Application tenant domain of application 
removed event is invalid";
-                   log.error(errorMsg);
-                   throw new RuntimeException(errorMsg);
-               }
-               
-               // check if an Application with same name exists in topology
-               String appId = appRemovedEvent.getApplicationId();
-            if (topology.applicationExists(appId)) {
-                log.warn("Application with id [ " + appId + " ] still exists 
in Topology, removing it");
-                topology.removeApplication(appId);
-            } 
-               
-            if (log.isDebugEnabled()) {
-                       log.debug("ApplicationRemovedMessageProcessor notifying 
listener " + object);
-               }
-            
-               notifyEventListeners(appRemovedEvent);
-               return true;
+
+            TopologyManager.acquireWriteLockForApplications();
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForApplications();
+            }
         
            } else {
                if (nextProcessor != null) {
@@ -96,6 +78,35 @@ public class ApplicationRemovedMessageProcessor extends 
MessageProcessor {
                    throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
                }
            }
-        
+    }
+
+    private boolean doProcess (ApplicationRemovedEvent event, Topology 
topology) {
+
+        // check if required properties are available
+        if (event.getApplicationId() == null) {
+            String errorMsg = "Application Id of application removed event is 
invalid";
+            log.error(errorMsg);
+            throw new RuntimeException(errorMsg);
+        }
+
+        if (event.getTenantDomain()== null) {
+            String errorMsg = "Application tenant domain of application 
removed event is invalid";
+            log.error(errorMsg);
+            throw new RuntimeException(errorMsg);
+        }
+
+        // check if an Application with same name exists in topology
+        String appId = event.getApplicationId();
+        if (topology.applicationExists(appId)) {
+            log.warn("Application with id [ " + appId + " ] still exists in 
Topology, removing it");
+            topology.removeApplication(appId);
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("ApplicationRemovedMessageProcessor notifying listener 
");
+        }
+
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
index 52de45b..78f772b 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
@@ -25,10 +25,10 @@ import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Status;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ClusterActivatedEvent;
-import org.apache.stratos.messaging.event.topology.ClusterMaintenanceModeEvent;
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import 
org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 /**
@@ -45,75 +45,91 @@ public class ClusterActivatedProcessor extends 
MessageProcessor {
 
     @Override
     public boolean process(String type, String message, Object object) {
+
         Topology topology = (Topology) object;
 
         if (ClusterActivatedEvent.class.getName().equals(type)) {
             // Return if topology has not been initialized
-            if (!topology.isInitialized())
+            if (!topology.isInitialized()) {
                 return false;
+            }
 
             // Parse complete message and build event
             ClusterActivatedEvent event = (ClusterActivatedEvent) Util.
                     jsonToObject(message, ClusterActivatedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
-                    // Service is excluded, do not update topology or fire 
event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: 
[service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireReadLockForServices();
+            TopologyManager.acquireWriteLockForService(event.getServiceName());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                
TopologyManager.releaseWriteLockForService(event.getServiceName());
+                TopologyManager.releaseReadLockForServices();
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire 
event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: 
[cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        }  else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
             }
+        }
+    }
+
+    private boolean doProcess (ClusterActivatedEvent event,Topology topology) {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] 
%s",
-                            event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if 
(TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName()))
 {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] 
%s", event.getServiceName()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
+        }
 
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster not exists in service: 
[service] %s [cluster] %s", event.getServiceName(),
-                            event.getClusterId()));
-                }
-            } else {
-                // Apply changes to the topology
-                cluster.setStatus(Status.Activated);
-                if (log.isInfoEnabled()) {
-                    log.info(String.format("Cluster updated as activated : %s",
-                            cluster.toString()));
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if 
(TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] 
%s", event.getClusterId()));
                 }
+                return false;
             }
+        }
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
+            }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
 
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster not exists in service: 
[service] %s [cluster] %s", event.getServiceName(),
+                        event.getClusterId()));
+            }
         } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process 
message using available message processors: [type] %s [body] %s", type, 
message));
+            // Apply changes to the topology
+            cluster.setStatus(Status.Activated);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Cluster updated as activated : %s",
+                        cluster.toString()));
             }
         }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
+
 }

Reply via email to