Repository: stratos Updated Branches: refs/heads/4.0.0-grouping 10ed5ef2f -> 0b15aa057
using hierarchical locking correctly in Stratos Topology event listeners Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/0b15aa05 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/0b15aa05 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/0b15aa05 Branch: refs/heads/4.0.0-grouping Commit: 0b15aa05733477494c9c94b757657094bf863cff Parents: 10ed5ef Author: Isuru Haththotuwa <[email protected]> Authored: Thu Oct 9 11:25:38 2014 +0530 Committer: Isuru Haththotuwa <[email protected]> Committed: Thu Oct 9 11:25:44 2014 +0530 ---------------------------------------------------------------------- .../AutoscalerTopologyEventReceiver.java | 4 ++-- .../LoadBalancerTopologyEventReceiver.java | 24 ++++++++++---------- .../ApplicationCreatedMessageProcessor.java | 1 + .../topology/ClusterActivatedProcessor.java | 4 ++-- .../ClusterMaintenanceModeMessageProcessor.java | 4 ++-- .../CompleteTopologyMessageProcessor.java | 6 ++++- 6 files changed, 24 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/0b15aa05/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java index b7d671f..4a27c81 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -300,7 +300,7 @@ public class AutoscalerTopologyEventReceiver implements Runnable { log.info("Event received: " + event); clusterMaitenanceEvent = (ClusterMaintenanceModeEvent) event; //TopologyManager.acquireReadLock(); - TopologyManager.acquireWriteLockForCluster(clusterMaitenanceEvent.getServiceName(), + TopologyManager.acquireReadLockForCluster(clusterMaitenanceEvent.getServiceName(), clusterMaitenanceEvent.getClusterId()); Service service = TopologyManager.getTopology().getService(clusterMaitenanceEvent.getServiceName()); @@ -319,7 +319,7 @@ public class AutoscalerTopologyEventReceiver implements Runnable { log.error("Error processing event", e); } finally { //TopologyManager.releaseReadLock(); - TopologyManager.releaseWriteLockForCluster(clusterMaitenanceEvent.getServiceName(), + TopologyManager.releaseReadLockForCluster(clusterMaitenanceEvent.getServiceName(), clusterMaitenanceEvent.getClusterId()); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/0b15aa05/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java index 4222075..f2b3072 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java @@ -122,7 +122,7 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; //TopologyManager.acquireReadLock(); - TopologyManager.acquireWriteLockForCluster(memberActivatedEvent.getServiceName(), + TopologyManager.acquireReadLockForCluster(memberActivatedEvent.getServiceName(), memberActivatedEvent.getClusterId()); try { @@ -173,7 +173,7 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { log.error("Error processing event", e); } finally { //TopologyManager.releaseReadLock(); - TopologyManager.releaseWriteLockForCluster(memberActivatedEvent.getServiceName(), + TopologyManager.releaseReadLockForCluster(memberActivatedEvent.getServiceName(), memberActivatedEvent.getClusterId()); } } @@ -184,7 +184,7 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event; - TopologyManager.acquireWriteLockForCluster(memberMaintenanceModeEvent.getServiceName(), + TopologyManager.acquireReadLockForCluster(memberMaintenanceModeEvent.getServiceName(), memberMaintenanceModeEvent.getClusterId()); try { @@ -200,7 +200,7 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { log.error("Error processing event", e); } finally { //TopologyManager.releaseReadLock(); - TopologyManager.releaseWriteLockForCluster(memberMaintenanceModeEvent.getServiceName(), + TopologyManager.releaseReadLockForCluster(memberMaintenanceModeEvent.getServiceName(), memberMaintenanceModeEvent.getClusterId()); } } @@ -210,7 +210,7 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { protected void onEvent(Event event) { MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event; - TopologyManager.acquireWriteLockForCluster(memberSuspendedEvent.getServiceName(), + TopologyManager.acquireReadLockForCluster(memberSuspendedEvent.getServiceName(), memberSuspendedEvent.getClusterId()); try { @@ -225,7 +225,7 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { log.error("Error processing event", e); } finally { //TopologyManager.releaseReadLock(); - TopologyManager.releaseWriteLockForCluster(memberSuspendedEvent.getServiceName(), + TopologyManager.releaseReadLockForCluster(memberSuspendedEvent.getServiceName(), memberSuspendedEvent.getClusterId()); } } @@ -237,7 +237,7 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { //TopologyManager.acquireReadLock(); MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; - TopologyManager.acquireWriteLockForCluster(memberTerminatedEvent.getServiceName(), + TopologyManager.acquireReadLockForCluster(memberTerminatedEvent.getServiceName(), memberTerminatedEvent.getClusterId()); try { @@ -251,7 +251,7 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { log.error("Error processing event", e); } finally { //TopologyManager.releaseReadLock(); - TopologyManager.releaseWriteLockForCluster(memberTerminatedEvent.getServiceName(), + TopologyManager.releaseReadLockForCluster(memberTerminatedEvent.getServiceName(), memberTerminatedEvent.getClusterId()); } } @@ -262,7 +262,7 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { // Remove cluster from context ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event; - TopologyManager.acquireWriteLockForCluster(clusterRemovedEvent.getServiceName(), + TopologyManager.acquireReadLockForCluster(clusterRemovedEvent.getServiceName(), clusterRemovedEvent.getClusterId()); try { @@ -283,7 +283,7 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { log.error("Error processing event", e); } finally { //TopologyManager.releaseReadLock(); - TopologyManager.releaseWriteLockForCluster(clusterRemovedEvent.getServiceName(), + TopologyManager.releaseReadLockForCluster(clusterRemovedEvent.getServiceName(), clusterRemovedEvent.getClusterId()); } } @@ -293,7 +293,7 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { protected void onEvent(Event event) { ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent) event; - TopologyManager.acquireWriteLockForServices(); + TopologyManager.acquireReadLockForService(serviceRemovedEvent.getServiceName()); try { //TopologyManager.acquireReadLock(); @@ -317,7 +317,7 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { log.error("Error processing event", e); } finally { //TopologyManager.releaseReadLock(); - TopologyManager.releaseWriteLockForServices(); + TopologyManager.releaseReadLockForService(serviceRemovedEvent.getServiceName()); } } }); http://git-wip-us.apache.org/repos/asf/stratos/blob/0b15aa05/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java index 34ed3ef..55786fa 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java @@ -58,6 +58,7 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor { } TopologyManager.acquireWriteLockForApplications(); + // since the Clusters will also get modified, acquire write locks for each Service Type Set<ClusterDataHolder> clusterDataHolders = event.getApplication().getClusterDataRecursively(); if (clusterDataHolders != null) { for (ClusterDataHolder clusterData : clusterDataHolders) { http://git-wip-us.apache.org/repos/asf/stratos/blob/0b15aa05/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java index 601cfb2..ef4bba2 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java @@ -58,12 +58,12 @@ public class ClusterActivatedProcessor extends MessageProcessor { ClusterActivatedEvent event = (ClusterActivatedEvent) Util. jsonToObject(message, ClusterActivatedEvent.class); - TopologyManager.acquireWriteLockForService(event.getServiceName()); + TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId()); try { return doProcess(event, topology); } finally { - TopologyManager.releaseWriteLockForService(event.getServiceName()); + TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId()); } } else { http://git-wip-us.apache.org/repos/asf/stratos/blob/0b15aa05/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java index 0b10504..15819ff 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java @@ -50,12 +50,12 @@ public class ClusterMaintenanceModeMessageProcessor extends MessageProcessor { ClusterMaintenanceModeEvent event = (ClusterMaintenanceModeEvent) Util. jsonToObject(message, ClusterMaintenanceModeEvent.class); - TopologyManager.acquireWriteLockForService(event.getServiceName()); + TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId()); try { return doProcess(event, topology); } finally { - TopologyManager.releaseWriteLockForService(event.getServiceName()); + TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId()); } } else { http://git-wip-us.apache.org/repos/asf/stratos/blob/0b15aa05/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java index 74daa48..999415d 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java @@ -55,7 +55,11 @@ public class CompleteTopologyMessageProcessor extends MessageProcessor { TopologyManager.acquireWriteLock(); try { - return doProcess(event, topology); + if (!topology.isInitialized()) { + return doProcess(event, topology); + } else { + return true; + } } finally { TopologyManager.releaseWriteLock();
