initial changes for hierarchical topology locking
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/4ace39c8 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/4ace39c8 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/4ace39c8 Branch: refs/heads/4.0.0-grouping Commit: 4ace39c85ab1f989e888dcc48afd2c1092ff245f Parents: 2536b30 Author: Isuru Haththotuwa <[email protected]> Authored: Tue Oct 7 10:21:48 2014 +0530 Committer: Isuru Haththotuwa <[email protected]> Committed: Tue Oct 7 19:08:54 2014 +0530 ---------------------------------------------------------------------- .../AutoscalerHealthStatEventReceiver.java | 18 +- .../AutoscalerTopologyEventReceiver.java | 151 ++++--- .../controller/topology/TopologyBuilder.java | 7 +- .../LoadBalancerTopologyEventReceiver.java | 77 +++- .../StratosManagerTopologyEventReceiver.java | 78 +++- .../messaging/domain/topology/Service.java | 9 +- .../messaging/domain/topology/Topology.java | 11 +- .../domain/topology/locking/TopologyLock.java | 49 +++ .../topology/locking/TopologyLockHierarchy.java | 147 +++++++ .../ApplicationActivatedMessageProcessor.java | 54 ++- .../ApplicationCreatedMessageProcessor.java | 65 +-- .../ApplicationRemovedMessageProcessor.java | 73 ++-- .../topology/ClusterActivatedProcessor.java | 108 +++-- .../ClusterCreatedMessageProcessor.java | 131 +++--- .../ClusterMaintenanceModeMessageProcessor.java | 104 +++-- .../ClusterRemovedMessageProcessor.java | 107 +++-- .../CompleteTopologyMessageProcessor.java | 200 +++++---- .../topology/GroupActivatedProcessor.java | 70 +-- .../InstanceSpawnedMessageProcessor.java | 150 ++++--- .../MemberActivatedMessageProcessor.java | 183 ++++---- .../MemberMaintenanceModeProcessor.java | 159 +++---- .../MemberReadyToShutdownMessageProcessor.java | 160 +++---- .../topology/MemberStartedMessageProcessor.java | 157 +++---- .../MemberSuspendedMessageProcessor.java | 155 +++---- .../MemberTerminatedMessageProcessor.java | 139 +++--- .../ServiceCreatedMessageProcessor.java | 74 ++-- .../ServiceRemovedMessageProcessor.java | 71 ++-- .../topology/TopologyMessageProcessorChain.java | 10 +- .../topology/TopologyEventMessageDelegator.java | 10 +- .../receiver/topology/TopologyManager.java | 425 ++++++++++++++++++- 30 files changed, 2070 insertions(+), 1082 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java index 26d3179..a1213f6 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java @@ -593,8 +593,11 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { } private Member findMember(String memberId) { + + TopologyManager.acquireReadLockForServices(); + try { - TopologyManager.acquireReadLock(); + //TopologyManager.acquireReadLock(); for(Service service : TopologyManager.getTopology().getServices()) { for(Cluster cluster : service.getClusters()) { if(cluster.memberExists(memberId)) { @@ -605,7 +608,8 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { return null; } finally { - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForServices(); } } @@ -613,8 +617,13 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { try { AbstractClusterMonitor monitor = getMonitor(clusterId); NetworkPartitionContext nwPartitionCtxt; + + // TODO: the optimal way would be to add Service Name to member fault event and use to get a + // hierarchical lock using TopologyManager.acquireReadLockForCluster(serviceName, clusterid) + TopologyManager.acquireReadLockForServices(); + try{ - TopologyManager.acquireReadLock(); + //TopologyManager.acquireReadLock(); Member member = findMember(memberId); if(null == member){ @@ -634,7 +643,8 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { } }finally{ - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForServices(); } // start a new member in the same Partition String partitionId = monitor.getPartitionOfMember(memberId); http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java index 3e1bfe2..4064510 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -90,22 +90,23 @@ public class AutoscalerTopologyEventReceiver implements Runnable { topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { @Override protected void onEvent(Event event) { - try { + + if (!topologyInitialized) { TopologyManager.acquireReadLock(); - if(!topologyInitialized) { - topologyInitialized = true; + + try { for (Application application : TopologyManager.getTopology().getApplications()) { startApplicationMonitor(application); } + + topologyInitialized = true; + } catch (Exception e) { + log.error("Error processing event", e); + } finally { + TopologyManager.releaseReadLock(); } - } catch (Exception e) { - log.error("Error processing event", e); - } finally { - TopologyManager.releaseReadLock(); } } - - }); @@ -118,12 +119,19 @@ public class AutoscalerTopologyEventReceiver implements Runnable { ApplicationCreatedEvent applicationCreatedEvent = (ApplicationCreatedEvent) event; //acquire read lock - TopologyManager.acquireReadLock(); - //start the application monitor - //TODO catch exception by ApplicationMonitor - startApplicationMonitor(applicationCreatedEvent.getApplication()); - //release read lock - TopologyManager.releaseReadLock(); + //TopologyManager.acquireReadLock(); + TopologyManager.acquireReadLockForApplication(applicationCreatedEvent.getApplication().getId()); + + try { + //start the application monitor + //TODO catch exception by ApplicationMonitor + startApplicationMonitor(applicationCreatedEvent.getApplication()); + + } finally { + //release read lock + TopologyManager.releaseReadLockForApplication(applicationCreatedEvent.getApplication().getId()); + //TopologyManager.releaseReadLock(); + } } }); @@ -197,7 +205,8 @@ public class AutoscalerTopologyEventReceiver implements Runnable { ApplicationRemovedEvent applicationRemovedEvent = (ApplicationRemovedEvent) event; //acquire read lock - TopologyManager.acquireReadLock(); + //TopologyManager.acquireReadLock(); + TopologyManager.acquireReadLockForApplication(applicationRemovedEvent.getApplicationId()); try { //TODO remove monitors as well as any starting or pending threads @@ -222,7 +231,8 @@ public class AutoscalerTopologyEventReceiver implements Runnable { } finally { //release read lock - TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForApplication(applicationRemovedEvent.getApplicationId()); + //TopologyManager.releaseReadLock(); } } @@ -283,26 +293,34 @@ public class AutoscalerTopologyEventReceiver implements Runnable { topologyEventReceiver.addEventListener(new ClusterMaintenanceModeEventListener() { @Override protected void onEvent(Event event) { + + ClusterMaintenanceModeEvent clusterMaitenanceEvent = null; + try { log.info("Event received: " + event); - ClusterMaintenanceModeEvent e = (ClusterMaintenanceModeEvent) event; - TopologyManager.acquireReadLock(); - Service service = TopologyManager.getTopology().getService(e.getServiceName()); - Cluster cluster = service.getCluster(e.getClusterId()); + clusterMaitenanceEvent = (ClusterMaintenanceModeEvent) event; + //TopologyManager.acquireReadLock(); + TopologyManager.acquireWriteLockForCluster(clusterMaitenanceEvent.getServiceName(), + clusterMaitenanceEvent.getClusterId()); + + Service service = TopologyManager.getTopology().getService(clusterMaitenanceEvent.getServiceName()); + Cluster cluster = service.getCluster(clusterMaitenanceEvent.getClusterId()); if (AutoscalerContext.getInstance().monitorExist((cluster.getClusterId()))) { - AutoscalerContext.getInstance().getMonitor(e.getClusterId()). - setStatus(e.getStatus()); + AutoscalerContext.getInstance().getMonitor(clusterMaitenanceEvent.getClusterId()). + setStatus(clusterMaitenanceEvent.getStatus()); } else if (AutoscalerContext.getInstance(). lbMonitorExist((cluster.getClusterId()))) { - AutoscalerContext.getInstance().getLBMonitor(e.getClusterId()). - setStatus(e.getStatus()); + AutoscalerContext.getInstance().getLBMonitor(clusterMaitenanceEvent.getClusterId()). + setStatus(clusterMaitenanceEvent.getStatus()); } else { log.error("cluster monitor not exists for the cluster: " + cluster.toString()); } } catch (Exception e) { log.error("Error processing event", e); } finally { - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseWriteLockForCluster(clusterMaitenanceEvent.getServiceName(), + clusterMaitenanceEvent.getClusterId()); } } @@ -312,16 +330,20 @@ public class AutoscalerTopologyEventReceiver implements Runnable { topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() { @Override protected void onEvent(Event event) { + + ClusterRemovedEvent clusterRemovedEvent = null; try { - ClusterRemovedEvent e = (ClusterRemovedEvent) event; - TopologyManager.acquireReadLock(); + clusterRemovedEvent = (ClusterRemovedEvent) event; + //TopologyManager.acquireReadLock(); + TopologyManager.acquireReadLockForCluster(clusterRemovedEvent.getServiceName(), + clusterRemovedEvent.getClusterId()); - String clusterId = e.getClusterId(); - String deploymentPolicy = e.getDeploymentPolicy(); + String clusterId = clusterRemovedEvent.getClusterId(); + String deploymentPolicy = clusterRemovedEvent.getDeploymentPolicy(); AbstractClusterMonitor monitor; - if (e.isLbCluster()) { + if (clusterRemovedEvent.isLbCluster()) { DeploymentPolicy depPolicy = PolicyManager.getInstance(). getDeploymentPolicy(deploymentPolicy); if (depPolicy != null) { @@ -362,7 +384,9 @@ public class AutoscalerTopologyEventReceiver implements Runnable { } catch (Exception e) { log.error("Error processing event", e); } finally { - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForCluster(clusterRemovedEvent.getServiceName(), + clusterRemovedEvent.getClusterId()); } } @@ -380,14 +404,19 @@ public class AutoscalerTopologyEventReceiver implements Runnable { @Override protected void onEvent(Event event) { + MemberTerminatedEvent memberTerminatedEvent = null; try { - TopologyManager.acquireReadLock(); - MemberTerminatedEvent e = (MemberTerminatedEvent) event; - String networkPartitionId = e.getNetworkPartitionId(); - String clusterId = e.getClusterId(); - String partitionId = e.getPartitionId(); + //TopologyManager.acquireReadLock(); + + memberTerminatedEvent = (MemberTerminatedEvent) event; + String networkPartitionId = memberTerminatedEvent.getNetworkPartitionId(); + String clusterId = memberTerminatedEvent.getClusterId(); + String partitionId = memberTerminatedEvent.getPartitionId(); AbstractClusterMonitor monitor; + TopologyManager.acquireReadLockForCluster(memberTerminatedEvent.getServiceName(), + memberTerminatedEvent.getClusterId()); + if (AutoscalerContext.getInstance().monitorExist(clusterId)) { monitor = AutoscalerContext.getInstance().getMonitor(clusterId); } else { @@ -400,7 +429,7 @@ public class AutoscalerTopologyEventReceiver implements Runnable { PartitionContext partitionContext = networkPartitionContext. getPartitionCtxt(partitionId); - String memberId = e.getMemberId(); + String memberId = memberTerminatedEvent.getMemberId(); partitionContext.removeMemberStatsContext(memberId); if (partitionContext.removeTerminationPendingMember(memberId)) { @@ -431,7 +460,9 @@ public class AutoscalerTopologyEventReceiver implements Runnable { } catch (Exception e) { log.error("Error processing event", e); } finally { - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForCluster(memberTerminatedEvent.getServiceName(), + memberTerminatedEvent.getClusterId()); } } @@ -441,14 +472,18 @@ public class AutoscalerTopologyEventReceiver implements Runnable { @Override protected void onEvent(Event event) { + MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; + + //TopologyManager.acquireReadLock(); + TopologyManager.acquireReadLockForCluster(memberActivatedEvent.getServiceName(), + memberActivatedEvent.getClusterId()); + try { - TopologyManager.acquireReadLock(); - MemberActivatedEvent e = (MemberActivatedEvent) event; - String networkPartitionId = e.getNetworkPartitionId(); - String clusterId = e.getClusterId(); - String partitionId = e.getPartitionId(); - String memberId = e.getMemberId(); + String networkPartitionId = memberActivatedEvent.getNetworkPartitionId(); + String clusterId = memberActivatedEvent.getClusterId(); + String partitionId = memberActivatedEvent.getPartitionId(); + String memberId = memberActivatedEvent.getMemberId(); AbstractClusterMonitor monitor; @@ -476,12 +511,14 @@ public class AutoscalerTopologyEventReceiver implements Runnable { // partitionContext.incrementCurrentActiveMemberCount(1); partitionContext.movePendingMemberToActiveMembers(memberId); //triggering the status checker - StatusChecker.getInstance().onMemberStatusChange(e.getClusterId()); + StatusChecker.getInstance().onMemberStatusChange(memberActivatedEvent.getClusterId()); } catch (Exception e) { log.error("Error processing event", e); } finally { - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForCluster(memberActivatedEvent.getServiceName(), + memberActivatedEvent.getClusterId()); } } }); @@ -490,16 +527,20 @@ public class AutoscalerTopologyEventReceiver implements Runnable { @Override protected void onEvent(Event event) { + MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event; + + //TopologyManager.acquireReadLock(); + TopologyManager.acquireReadLockForCluster(memberMaintenanceModeEvent.getServiceName(), + memberMaintenanceModeEvent.getClusterId()); + try { - TopologyManager.acquireReadLock(); - MemberMaintenanceModeEvent e = (MemberMaintenanceModeEvent) event; - String memberId = e.getMemberId(); - String partitionId = e.getPartitionId(); - String networkPartitionId = e.getNetworkPartitionId(); + String memberId = memberMaintenanceModeEvent.getMemberId(); + String partitionId = memberMaintenanceModeEvent.getPartitionId(); + String networkPartitionId = memberMaintenanceModeEvent.getNetworkPartitionId(); PartitionContext partitionContext; - String clusterId = e.getClusterId(); + String clusterId = memberMaintenanceModeEvent.getClusterId(); AbstractClusterMonitor monitor; if (AutoscalerContext.getInstance().monitorExist(clusterId)) { @@ -521,7 +562,9 @@ public class AutoscalerTopologyEventReceiver implements Runnable { } catch (Exception e) { log.error("Error processing event", e); } finally { - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForCluster(memberMaintenanceModeEvent.getServiceName(), + memberMaintenanceModeEvent.getClusterId()); } } }); http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java index 6d50f6c..30b1b00 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java @@ -608,7 +608,7 @@ public class TopologyBuilder { } } - public static void handleApplicationDeployed(Application application, + public static synchronized void handleApplicationDeployed(Application application, Set<ApplicationClusterContext> applicationClusterContexts, Set<MetaDataHolder> metaDataHolders) { @@ -644,6 +644,7 @@ public class TopologyBuilder { // add to Topology and update topology.addApplication(application); TopologyManager.updateTopology(topology); + log.info("Application with id [ " + application.getId() + " ] added to Topology successfully"); TopologyEventPublisher.sendApplicationCreatedEvent(application ,clusters); @@ -653,7 +654,8 @@ public class TopologyBuilder { } } - public static void handleApplicationUndeployed(FasterLookUpDataHolder dataHolder, String applicationId, int tenantId, String tenantDomain) { + public static synchronized void handleApplicationUndeployed(FasterLookUpDataHolder dataHolder, + String applicationId, int tenantId, String tenantDomain) { Topology topology = TopologyManager.getTopology(); @@ -672,6 +674,7 @@ public class TopologyBuilder { if (service != null) { // remove Cluster service.removeCluster(clusterDataHolder.getClusterId()); + if (log.isDebugEnabled()) { log.debug("Removed cluster with id " + clusterDataHolder.getClusterId()); } http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java index a5b5e42..4222075 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java @@ -118,10 +118,15 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { topologyEventReceiver.addEventListener(new MemberActivatedEventListener() { @Override protected void onEvent(Event event) { + + MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; + + //TopologyManager.acquireReadLock(); + TopologyManager.acquireWriteLockForCluster(memberActivatedEvent.getServiceName(), + memberActivatedEvent.getClusterId()); + try { - TopologyManager.acquireReadLock(); - MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; Service service = TopologyManager.getTopology().getService(memberActivatedEvent.getServiceName()); if (service == null) { if (log.isWarnEnabled()) { @@ -167,16 +172,24 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { } catch (Exception e) { log.error("Error processing event", e); } finally { - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseWriteLockForCluster(memberActivatedEvent.getServiceName(), + memberActivatedEvent.getClusterId()); } } }); topologyEventReceiver.addEventListener(new MemberMaintenanceListener() { @Override protected void onEvent(Event event) { + + MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event; + + TopologyManager.acquireWriteLockForCluster(memberMaintenanceModeEvent.getServiceName(), + memberMaintenanceModeEvent.getClusterId()); + try { - TopologyManager.acquireReadLock(); - MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event; + //TopologyManager.acquireReadLock(); + Member member = findMember(memberMaintenanceModeEvent.getServiceName(), memberMaintenanceModeEvent.getClusterId(), memberMaintenanceModeEvent.getMemberId()); @@ -186,16 +199,22 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { } catch (Exception e) { log.error("Error processing event", e); } finally { - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseWriteLockForCluster(memberMaintenanceModeEvent.getServiceName(), + memberMaintenanceModeEvent.getClusterId()); } } }); topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() { @Override protected void onEvent(Event event) { + + MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event; + TopologyManager.acquireWriteLockForCluster(memberSuspendedEvent.getServiceName(), + memberSuspendedEvent.getClusterId()); + try { - TopologyManager.acquireReadLock(); - MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event; + //TopologyManager.acquireReadLock(); Member member = findMember(memberSuspendedEvent.getServiceName(), memberSuspendedEvent.getClusterId(), memberSuspendedEvent.getMemberId()); @@ -205,16 +224,23 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { } catch (Exception e) { log.error("Error processing event", e); } finally { - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseWriteLockForCluster(memberSuspendedEvent.getServiceName(), + memberSuspendedEvent.getClusterId()); } } }); topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() { @Override protected void onEvent(Event event) { + + //TopologyManager.acquireReadLock(); + MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; + + TopologyManager.acquireWriteLockForCluster(memberTerminatedEvent.getServiceName(), + memberTerminatedEvent.getClusterId()); + try { - TopologyManager.acquireReadLock(); - MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; Member member = findMember(memberTerminatedEvent.getServiceName(), memberTerminatedEvent.getClusterId(), memberTerminatedEvent.getMemberId()); @@ -224,18 +250,23 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { } catch (Exception e) { log.error("Error processing event", e); } finally { - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseWriteLockForCluster(memberTerminatedEvent.getServiceName(), + memberTerminatedEvent.getClusterId()); } } }); topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() { @Override protected void onEvent(Event event) { - try { - TopologyManager.acquireReadLock(); - // Remove cluster from context - ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event; + // Remove cluster from context + ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event; + TopologyManager.acquireWriteLockForCluster(clusterRemovedEvent.getServiceName(), + clusterRemovedEvent.getClusterId()); + + try { + //TopologyManager.acquireReadLock(); Cluster cluster = LoadBalancerContext.getInstance().getClusterIdClusterMap().getCluster(clusterRemovedEvent.getClusterId()); if (cluster != null) { for (Member member : cluster.getMembers()) { @@ -251,18 +282,23 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { } catch (Exception e) { log.error("Error processing event", e); } finally { - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseWriteLockForCluster(clusterRemovedEvent.getServiceName(), + clusterRemovedEvent.getClusterId()); } } }); topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() { @Override protected void onEvent(Event event) { + + ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent) event; + TopologyManager.acquireWriteLockForServices(); + try { - TopologyManager.acquireReadLock(); + //TopologyManager.acquireReadLock(); // Remove all clusters of given service from context - ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent) event; Service service = TopologyManager.getTopology().getService(serviceRemovedEvent.getServiceName()); if (service != null) { for (Cluster cluster : service.getClusters()) { @@ -280,7 +316,8 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { } catch (Exception e) { log.error("Error processing event", e); } finally { - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseWriteLockForServices(); } } }); http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java index c3a5719..d61f474 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java @@ -23,8 +23,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.manager.exception.ADCException; import org.apache.stratos.manager.exception.ApplicationSubscriptionException; -import org.apache.stratos.manager.exception.CompositeApplicationDefinitionException; -import org.apache.stratos.manager.exception.CompositeApplicationException; import org.apache.stratos.manager.manager.CartridgeSubscriptionManager; import org.apache.stratos.manager.subscription.ApplicationSubscription; import org.apache.stratos.manager.topology.model.TopologyClusterInformationModel; @@ -95,7 +93,9 @@ public class StratosManagerTopologyEventReceiver implements Runnable { String serviceType = clustercreatedEvent.getServiceName(); //acquire read lock - TopologyManager.acquireReadLock(); + //TopologyManager.acquireReadLock(); + TopologyManager.acquireReadLockForCluster(clustercreatedEvent.getServiceName(), + clustercreatedEvent.getClusterId()); try { Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clustercreatedEvent.getClusterId()); @@ -103,7 +103,9 @@ public class StratosManagerTopologyEventReceiver implements Runnable { } finally { //release read lock - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForCluster(clustercreatedEvent.getServiceName(), + clustercreatedEvent.getClusterId()); } } @@ -137,14 +139,18 @@ public class StratosManagerTopologyEventReceiver implements Runnable { String serviceType = instanceSpawnedEvent.getServiceName(); //acquire read lock - TopologyManager.acquireReadLock(); + //TopologyManager.acquireReadLock(); + TopologyManager.acquireReadLockForCluster(instanceSpawnedEvent.getServiceName(), + instanceSpawnedEvent.getClusterId()); try { Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain); TopologyClusterInformationModel.getInstance().addCluster(cluster); } finally { //release read lock - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForCluster(instanceSpawnedEvent.getServiceName(), + instanceSpawnedEvent.getClusterId()); } } }); @@ -162,14 +168,18 @@ public class StratosManagerTopologyEventReceiver implements Runnable { String serviceType = memberStartedEvent.getServiceName(); //acquire read lock - TopologyManager.acquireReadLock(); + //TopologyManager.acquireReadLock(); + TopologyManager.acquireReadLockForCluster(memberStartedEvent.getServiceName(), + memberStartedEvent.getClusterId()); try { Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain); TopologyClusterInformationModel.getInstance().addCluster(cluster); } finally { //release read lock - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForCluster(memberStartedEvent.getServiceName(), + memberStartedEvent.getClusterId()); } } @@ -188,14 +198,18 @@ public class StratosManagerTopologyEventReceiver implements Runnable { String serviceType = memberActivatedEvent.getServiceName(); //acquire read lock - TopologyManager.acquireReadLock(); + //TopologyManager.acquireReadLock(); + TopologyManager.acquireReadLockForCluster(memberActivatedEvent.getServiceName(), + memberActivatedEvent.getClusterId()); try { Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain); TopologyClusterInformationModel.getInstance().addCluster(cluster); } finally { //release read lock - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForCluster(memberActivatedEvent.getServiceName(), + memberActivatedEvent.getClusterId()); } } }); @@ -213,7 +227,9 @@ public class StratosManagerTopologyEventReceiver implements Runnable { String serviceType = memberSuspendedEvent.getServiceName(); //acquire read lock - TopologyManager.acquireReadLock(); + //TopologyManager.acquireReadLock(); + TopologyManager.acquireReadLockForCluster(memberSuspendedEvent.getServiceName(), + memberSuspendedEvent.getClusterId()); try { Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain); @@ -221,7 +237,9 @@ public class StratosManagerTopologyEventReceiver implements Runnable { } finally { //release read lock - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForCluster(memberSuspendedEvent.getServiceName(), + memberSuspendedEvent.getClusterId()); } } }); @@ -239,7 +257,9 @@ public class StratosManagerTopologyEventReceiver implements Runnable { String serviceType = memberTerminatedEvent.getServiceName(); //acquire read lock - TopologyManager.acquireReadLock(); + //TopologyManager.acquireReadLock(); + TopologyManager.acquireReadLockForCluster(memberTerminatedEvent.getServiceName(), + memberTerminatedEvent.getClusterId()); try { Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain); @@ -247,8 +267,12 @@ public class StratosManagerTopologyEventReceiver implements Runnable { // check and remove terminated member if (cluster.memberExists(memberTerminatedEvent.getMemberId())) { // release the read lock and acquire the write lock - TopologyManager.releaseReadLock(); - TopologyManager.acquireWriteLock(); +// TopologyManager.releaseReadLock(); +// TopologyManager.acquireWriteLock(); + TopologyManager.releaseReadLockForCluster(memberTerminatedEvent.getServiceName(), + memberTerminatedEvent.getClusterId()); + TopologyManager.acquireWriteLockForCluster(memberTerminatedEvent.getServiceName(), + memberTerminatedEvent.getClusterId()); try { // re-check the state; another thread might have acquired the write lock and modified @@ -263,17 +287,23 @@ public class StratosManagerTopologyEventReceiver implements Runnable { // downgrade to read lock - 1. acquire read lock, 2. release write lock // acquire read lock - TopologyManager.acquireReadLock(); + //TopologyManager.acquireReadLock(); + TopologyManager.acquireReadLockForCluster(memberTerminatedEvent.getServiceName(), + memberTerminatedEvent.getClusterId()); } finally { // release the write lock - TopologyManager.releaseWriteLock(); + // TopologyManager.releaseWriteLock(); + TopologyManager.releaseWriteLockForCluster(memberTerminatedEvent.getServiceName(), + memberTerminatedEvent.getClusterId()); } } TopologyClusterInformationModel.getInstance().addCluster(cluster); } finally { //release read lock - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForCluster(memberTerminatedEvent.getServiceName(), + memberTerminatedEvent.getClusterId()); } } }); @@ -288,7 +318,8 @@ public class StratosManagerTopologyEventReceiver implements Runnable { log.info("[ApplicationCreatedEventListener] Received: " + event.getClass()); try { - TopologyManager.acquireReadLock(); + //TopologyManager.acquireReadLock(); + TopologyManager.acquireReadLockForApplication(appCreateEvent.getApplication().getId()); // create and persist Application subscritpion CartridgeSubscriptionManager cartridgeSubscriptionManager = new CartridgeSubscriptionManager(); @@ -318,7 +349,8 @@ public class StratosManagerTopologyEventReceiver implements Runnable { PrivilegedCarbonContext.endTenantFlow(); } } finally { - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForApplication(appCreateEvent.getApplication().getId()); } } }); @@ -333,7 +365,8 @@ public class StratosManagerTopologyEventReceiver implements Runnable { log.info("[ApplicationRemovedEventListener] Received: " + event.getClass()); try { - TopologyManager.acquireReadLock(); + //TopologyManager.acquireReadLock(); + TopologyManager.acquireReadLockForApplication(appRemovedEvent.getApplicationId()); // create and persist Application subscritpion CartridgeSubscriptionManager cartridgeSubscriptionManager = new CartridgeSubscriptionManager(); @@ -360,7 +393,8 @@ public class StratosManagerTopologyEventReceiver implements Runnable { PrivilegedCarbonContext.endTenantFlow(); } } finally { - TopologyManager.releaseReadLock(); + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForApplication(appRemovedEvent.getApplicationId()); } } }); http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java index 46d46d4..d03cfda 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java @@ -19,6 +19,9 @@ package org.apache.stratos.messaging.domain.topology; +import org.apache.stratos.messaging.domain.topology.locking.TopologyLock; +import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy; + import java.io.Serializable; import java.util.*; @@ -59,14 +62,18 @@ public class Service implements Serializable{ public void addCluster(Cluster cluster) { this.clusterIdClusterMap.put(cluster.getClusterId(), cluster); + TopologyLockHierarchy.getInstance().addClusterLock(cluster.getClusterId(), new TopologyLock()); } public void removeCluster(Cluster cluster) { this.clusterIdClusterMap.remove(cluster.getClusterId()); + TopologyLockHierarchy.getInstance().removeTopologyLockForCluster(cluster.getClusterId()); } public Cluster removeCluster(String clusterId) { - return this.clusterIdClusterMap.remove(clusterId); + Cluster removedCluster = this.clusterIdClusterMap.remove(clusterId); + TopologyLockHierarchy.getInstance().removeTopologyLockForCluster(clusterId); + return removedCluster; } public boolean clusterExists(String clusterId) { http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java index f8b535f..dabf611 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java @@ -26,8 +26,8 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.domain.topology.util.CompositeApplicationBuilder; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import org.apache.stratos.messaging.domain.topology.locking.TopologyLock; +import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy; /** * Defines a topology of serviceMap in Stratos. @@ -54,6 +54,7 @@ public class Topology implements Serializable { public void addApplication (Application application) { this.applicationMap.put(application.getId(), application); + TopologyLockHierarchy.getInstance().addApplicationLock(application.getId(), new TopologyLock()); } public Application getApplication (String applicationId) { @@ -62,6 +63,7 @@ public class Topology implements Serializable { public void removeApplication (String applicationId) { applicationMap.remove(applicationId); + TopologyLockHierarchy.getInstance().removeTopologyLockForApplication(applicationId); } public Collection<Application> getApplications () { @@ -78,9 +80,10 @@ public class Topology implements Serializable { public void addService(Service service) { this.serviceMap.put(service.getServiceName(), service); + TopologyLockHierarchy.getInstance().addServiceLock(service.getServiceName(), new TopologyLock()); } - public void addServices(Collection<Service> services) { + public synchronized void addServices(Collection<Service> services) { for (Service service : services) { addService(service); } @@ -88,10 +91,12 @@ public class Topology implements Serializable { public void removeService(Service service) { this.serviceMap.remove(service.getServiceName()); + TopologyLockHierarchy.getInstance().removeTopologyLockForService(service.getServiceName()); } public void removeService(String serviceName) { this.serviceMap.remove(serviceName); + TopologyLockHierarchy.getInstance().removeTopologyLockForService(serviceName); } public Service getService(String serviceName) { http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLock.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLock.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLock.java new file mode 100644 index 0000000..e1b90ad --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLock.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.domain.topology.locking; + +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class TopologyLock { + + private final ReentrantReadWriteLock lock; + + public TopologyLock () { + lock = new ReentrantReadWriteLock(true); + } + + public void acquireWriteLock() { + lock.writeLock().lock(); + } + + public void releaseWritelock() { + if (lock.isWriteLockedByCurrentThread()) { + lock.writeLock().unlock(); + } + } + + public void acquireReadLock() { + lock.readLock().lock(); + } + + public void releaseReadLock() { + lock.readLock().unlock(); + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java new file mode 100644 index 0000000..aa0f7a1 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.domain.topology.locking; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class TopologyLockHierarchy { + + private static final Log log = LogFactory.getLog(TopologyLockHierarchy.class); + + // lock for Services + private TopologyLock serviceLock; + + // lock for Applications + private TopologyLock applicatioLock; + + // key = Service.name + private Map<String, TopologyLock> serviceNameToTopologyLockMap; + + // key = Application.id + private Map<String, TopologyLock> applicationIdToTopologyLockMap; + + // key = Cluster.id + private Map<String, TopologyLock> clusterIdToTopologyLockMap; + + private static volatile TopologyLockHierarchy topologyLockHierarchy; + + private TopologyLockHierarchy () { + + this.serviceLock = new TopologyLock(); + this.applicatioLock = new TopologyLock(); + this.serviceNameToTopologyLockMap = new ConcurrentHashMap<String, TopologyLock>(); + this.applicationIdToTopologyLockMap = new ConcurrentHashMap<String, TopologyLock>(); + this.clusterIdToTopologyLockMap = new ConcurrentHashMap<String, TopologyLock>(); + } + + public static TopologyLockHierarchy getInstance () { + + if (topologyLockHierarchy == null) { + synchronized (TopologyLockHierarchy.class) { + if (topologyLockHierarchy == null) { + topologyLockHierarchy = new TopologyLockHierarchy(); + } + } + } + + return topologyLockHierarchy; + } + + public void addApplicationLock (String appId, final TopologyLock topologyLock) { + + if (!applicationIdToTopologyLockMap.containsKey(appId)) { + synchronized (applicationIdToTopologyLockMap) { + if (!applicationIdToTopologyLockMap.containsKey(appId)) { + applicationIdToTopologyLockMap.put(appId, topologyLock); + log.info("Added lock for Application " + appId); + } + } + } else { + log.warn("Topology Lock for Application " + appId + " already exists"); + } + } + + public TopologyLock getTopologyLockForApplication (String appId) { + return applicationIdToTopologyLockMap.get(appId); + } + + public void addServiceLock (String serviceName, final TopologyLock topologyLock) { + + if (!serviceNameToTopologyLockMap.containsKey(serviceName)) { + synchronized (serviceNameToTopologyLockMap) { + if (!serviceNameToTopologyLockMap.containsKey(serviceName)) { + serviceNameToTopologyLockMap.put(serviceName, topologyLock); + log.info("Added lock for Service " + serviceName); + } + } + } else { + log.warn("Topology Lock for Service " + serviceName + " already exists"); + } + } + + public TopologyLock getTopologyLockForService (String serviceName) { + return serviceNameToTopologyLockMap.get(serviceName); + } + + public void addClusterLock (String clusterId, final TopologyLock topologyLock) { + + if (!clusterIdToTopologyLockMap.containsKey(clusterId)) { + synchronized (clusterIdToTopologyLockMap) { + if (!clusterIdToTopologyLockMap.containsKey(clusterId)) { + clusterIdToTopologyLockMap.put(clusterId, topologyLock); + log.info("Added lock for Cluster " + clusterId); + } + } + } else { + log.warn("Topology Lock for Cluster " + clusterId + " already exists"); + } + } + + public TopologyLock getTopologyLockForCluster (String clusterId) { + return clusterIdToTopologyLockMap.get(clusterId); + } + + public void removeTopologyLockForApplication (String appId) { + applicationIdToTopologyLockMap.remove(appId); + log.info("Removed lock for Application " + appId); + } + + public void removeTopologyLockForService (String serviceName) { + serviceNameToTopologyLockMap.remove(serviceName); + log.info("Removed lock for Service " + serviceName); + } + + public void removeTopologyLockForCluster (String clusterId) { + clusterIdToTopologyLockMap.remove(clusterId); + log.info("Removed lock for Cluster " + clusterId); + } + + public TopologyLock getServiceLock() { + return serviceLock; + } + + public TopologyLock getApplicatioLock() { + return applicatioLock; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java index 8c1e66b..803a871 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java @@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Status; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ApplicationActivatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Util; /** @@ -56,26 +57,16 @@ public class ApplicationActivatedMessageProcessor extends MessageProcessor { ApplicationActivatedEvent event = (ApplicationActivatedEvent) Util. jsonToObject(message, ApplicationActivatedEvent.class); - // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); - if (application == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Application does not exist: [service] %s", - event.getAppId())); - } - return false; - } else { - // Apply changes to the topology - application.setStatus(Status.Activated); - if (log.isInfoEnabled()) { - log.info(String.format("Application updated as activated : %s", - application.toString())); - } - } + TopologyManager.acquireReadLockForApplications(); + TopologyManager.acquireWriteLockForApplication(event.getAppId()); - // Notify event listeners - notifyEventListeners(event); - return true; + try { + return doProcess(event, topology); + + } finally { + TopologyManager.releaseWriteLockForApplication(event.getAppId()); + TopologyManager.releaseReadLockForApplications(); + } } else { if (nextProcessor != null) { @@ -86,4 +77,29 @@ public class ApplicationActivatedMessageProcessor extends MessageProcessor { } } } + + private boolean doProcess (ApplicationActivatedEvent event, Topology topology) { + + // Validate event against the existing topology + Application application = topology.getApplication(event.getAppId()); + if (application == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Application does not exist: [service] %s", + event.getAppId())); + } + return false; + } else { + // Apply changes to the topology + application.setStatus(Status.Activated); + if (log.isInfoEnabled()) { + log.info(String.format("Application updated as activated : %s", + application.toString())); + } + } + + // Notify event listeners + notifyEventListeners(event); + return true; + + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java index f02e4be..4368bd7 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java @@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Util; public class ApplicationCreatedMessageProcessor extends MessageProcessor { @@ -47,40 +48,21 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor { return false; } - ApplicationCreatedEvent appCreatedEvent = (ApplicationCreatedEvent) Util.jsonToObject(message, ApplicationCreatedEvent.class); - if (appCreatedEvent == null) { + ApplicationCreatedEvent event = (ApplicationCreatedEvent) Util.jsonToObject(message, ApplicationCreatedEvent.class); + if (event == null) { log.error("Unable to convert the JSON message to ApplicationCreatedEvent"); return false; } - // check if required properties are available - if (appCreatedEvent.getApplication() == null) { - String errorMsg = "Application object of application created event is invalid"; - log.error(errorMsg); - throw new RuntimeException(errorMsg); - } - - if (appCreatedEvent.getApplication().getId() == null || appCreatedEvent.getApplication().getId().isEmpty()) { - String errorMsg = "App id of application created event is invalid: [ " + appCreatedEvent.getApplication().getId() + " ]"; - log.error(errorMsg); - throw new RuntimeException(errorMsg); - } + TopologyManager.acquireWriteLockForApplications(); - // check if an Application with same name exists in topology - if (topology.applicationExists(appCreatedEvent.getApplication().getId())) { - log.warn("Application with id [ " + appCreatedEvent.getApplication().getId() + " ] already exists in Topology"); + try { + return doProcess(event, topology); - } else { - // add application and the clusters to Topology - for(Cluster cluster: appCreatedEvent.getClusterList()) { - topology.getService(cluster.getServiceName()).addCluster(cluster); - } - topology.addApplication(appCreatedEvent.getApplication()); + } finally { + TopologyManager.releaseWriteLockForApplications(); } - notifyEventListeners(appCreatedEvent); - return true; - } else { if (nextProcessor != null) { // ask the next processor to take care of the message. @@ -90,4 +72,35 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor { } } } + + private boolean doProcess (ApplicationCreatedEvent event,Topology topology) { + + // check if required properties are available + if (event.getApplication() == null) { + String errorMsg = "Application object of application created event is invalid"; + log.error(errorMsg); + throw new RuntimeException(errorMsg); + } + + if (event.getApplication().getId() == null || event.getApplication().getId().isEmpty()) { + String errorMsg = "App id of application created event is invalid: [ " + event.getApplication().getId() + " ]"; + log.error(errorMsg); + throw new RuntimeException(errorMsg); + } + + // check if an Application with same name exists in topology + if (topology.applicationExists(event.getApplication().getId())) { + log.warn("Application with id [ " + event.getApplication().getId() + " ] already exists in Topology"); + + } else { + // add application and the clusters to Topology + for(Cluster cluster: event.getClusterList()) { + topology.getService(cluster.getServiceName()).addCluster(cluster); + } + topology.addApplication(event.getApplication()); + } + + notifyEventListeners(event); + return true; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java index c9dbb07..ed6c2d4 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java @@ -22,9 +22,9 @@ package org.apache.stratos.messaging.message.processor.topology; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.topology.Topology; -import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent; import org.apache.stratos.messaging.event.topology.ApplicationRemovedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Util; public class ApplicationRemovedMessageProcessor extends MessageProcessor { @@ -55,38 +55,20 @@ public class ApplicationRemovedMessageProcessor extends MessageProcessor { return false; } - ApplicationRemovedEvent appRemovedEvent = (ApplicationRemovedEvent) Util.jsonToObject(message, ApplicationRemovedEvent.class); - if (appRemovedEvent == null) { + ApplicationRemovedEvent event = (ApplicationRemovedEvent) Util.jsonToObject(message, ApplicationRemovedEvent.class); + if (event == null) { log.error("Unable to convert the JSON message to ApplicationCreatedEvent"); return false; } - - // check if required properties are available - if (appRemovedEvent.getApplicationId() == null) { - String errorMsg = "Application Id of application removed event is invalid"; - log.error(errorMsg); - throw new RuntimeException(errorMsg); - } - - if (appRemovedEvent.getTenantDomain()== null) { - String errorMsg = "Application tenant domain of application removed event is invalid"; - log.error(errorMsg); - throw new RuntimeException(errorMsg); - } - - // check if an Application with same name exists in topology - String appId = appRemovedEvent.getApplicationId(); - if (topology.applicationExists(appId)) { - log.warn("Application with id [ " + appId + " ] still exists in Topology, removing it"); - topology.removeApplication(appId); - } - - if (log.isDebugEnabled()) { - log.debug("ApplicationRemovedMessageProcessor notifying listener " + object); - } - - notifyEventListeners(appRemovedEvent); - return true; + + TopologyManager.acquireWriteLockForApplications(); + + try { + return doProcess(event, topology); + + } finally { + TopologyManager.releaseWriteLockForApplications(); + } } else { if (nextProcessor != null) { @@ -96,6 +78,35 @@ public class ApplicationRemovedMessageProcessor extends MessageProcessor { throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } } - + } + + private boolean doProcess (ApplicationRemovedEvent event, Topology topology) { + + // check if required properties are available + if (event.getApplicationId() == null) { + String errorMsg = "Application Id of application removed event is invalid"; + log.error(errorMsg); + throw new RuntimeException(errorMsg); + } + + if (event.getTenantDomain()== null) { + String errorMsg = "Application tenant domain of application removed event is invalid"; + log.error(errorMsg); + throw new RuntimeException(errorMsg); + } + + // check if an Application with same name exists in topology + String appId = event.getApplicationId(); + if (topology.applicationExists(appId)) { + log.warn("Application with id [ " + appId + " ] still exists in Topology, removing it"); + topology.removeApplication(appId); + } + + if (log.isDebugEnabled()) { + log.debug("ApplicationRemovedMessageProcessor notifying listener "); + } + + notifyEventListeners(event); + return true; } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java index 52de45b..78f772b 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java @@ -25,10 +25,10 @@ import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.domain.topology.Status; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ClusterActivatedEvent; -import org.apache.stratos.messaging.event.topology.ClusterMaintenanceModeEvent; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Util; /** @@ -45,75 +45,91 @@ public class ClusterActivatedProcessor extends MessageProcessor { @Override public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; if (ClusterActivatedEvent.class.getName().equals(type)) { // Return if topology has not been initialized - if (!topology.isInitialized()) + if (!topology.isInitialized()) { return false; + } // Parse complete message and build event ClusterActivatedEvent event = (ClusterActivatedEvent) Util. jsonToObject(message, ClusterActivatedEvent.class); - // Apply service filter - if (TopologyServiceFilter.getInstance().isActive()) { - if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { - // Service is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); - } - return false; - } + TopologyManager.acquireReadLockForServices(); + TopologyManager.acquireWriteLockForService(event.getServiceName()); + try { + return doProcess(event, topology); + + } finally { + TopologyManager.releaseWriteLockForService(event.getServiceName()); + TopologyManager.releaseReadLockForServices(); } - // Apply cluster filter - if (TopologyClusterFilter.getInstance().isActive()) { - if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { - // Cluster is excluded, do not update topology or fire event - if (log.isDebugEnabled()) { - log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); - } - return false; - } + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } + } + } + + private boolean doProcess (ClusterActivatedEvent event,Topology topology) { - // Validate event against the existing topology - Service service = topology.getService(event.getServiceName()); - if (service == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Service does not exist: [service] %s", - event.getServiceName())); + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); } return false; } - Cluster cluster = service.getCluster(event.getClusterId()); + } - if (cluster == null) { - if (log.isWarnEnabled()) { - log.warn(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(), - event.getClusterId())); - } - } else { - // Apply changes to the topology - cluster.setStatus(Status.Activated); - if (log.isInfoEnabled()) { - log.info(String.format("Cluster updated as activated : %s", - cluster.toString())); + // Apply cluster filter + if (TopologyClusterFilter.getInstance().isActive()) { + if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { + // Cluster is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); } + return false; } + } - // Notify event listeners - notifyEventListeners(event); - return true; + // Validate event against the existing topology + Service service = topology.getService(event.getServiceName()); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service does not exist: [service] %s", + event.getServiceName())); + } + return false; + } + Cluster cluster = service.getCluster(event.getClusterId()); + if (cluster == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(), + event.getClusterId())); + } } else { - if (nextProcessor != null) { - // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); - } else { - throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + // Apply changes to the topology + cluster.setStatus(Status.Activated); + if (log.isInfoEnabled()) { + log.info(String.format("Cluster updated as activated : %s", + cluster.toString())); } } + + // Notify event listeners + notifyEventListeners(event); + return true; } + }
