moving applications stuff to autoscaler - II
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/c42d7c1d Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/c42d7c1d Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/c42d7c1d Branch: refs/heads/4.0.0-grouping Commit: c42d7c1dc232256e4bbb7a10bda7e8f110ddb795 Parents: d4f90be Author: Isuru Haththotuwa <[email protected]> Authored: Fri Oct 31 15:55:27 2014 +0530 Committer: Isuru Haththotuwa <[email protected]> Committed: Fri Oct 31 16:07:05 2014 +0530 ---------------------------------------------------------------------- .../topic/ApplicationsEventPublisher.java | 56 ++++----- .../AutoscalerTopologyEventReceiver.java | 7 +- .../monitor/ApplicationMonitorFactory.java | 13 +- .../status/checker/StatusChecker.java | 21 ++-- .../topology/TopologyEventPublisher.java | 2 +- .../domain/applications/Applications.java | 2 +- .../locking/ApplicationLockHierarchy.java | 4 +- .../topology/locking/TopologyLockHierarchy.java | 42 ------- .../ApplicationActivatedMessageProcessor.java | 24 ++-- .../ApplicationCreatedMessageProcessor.java | 18 ++- .../ApplicationInactivatedMessageProcessor.java | 24 ++-- .../ApplicationTerminatedMessageProcessor.java | 43 +++---- .../ApplicationTerminatingMessageProcessor.java | 24 ++-- .../ApplicationUndeployedMessageProcessor.java | 61 +++++----- .../CompleteApplicationsMessageProcessor.java | 17 +-- .../applications/GroupActivatedProcessor.java | 20 ++-- .../applications/GroupCreatedProcessor.java | 24 ++-- .../applications/GroupInActivateProcessor.java | 22 ++-- .../applications/GroupTerminatedProcessor.java | 24 ++-- .../applications/GroupTerminatingProcessor.java | 24 ++-- .../updater/ApplicationsUpdater.java | 23 ++-- .../CompleteTopologyMessageProcessor.java | 15 --- .../topology/updater/TopologyUpdater.java | 68 +---------- .../applications/ApplicationManager.java | 119 +++++++++++++++++++ .../receiver/topology/TopologyManager.java | 67 +---------- 25 files changed, 351 insertions(+), 413 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java index f8d989f..bdbffc0 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java @@ -7,7 +7,7 @@ import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.domain.applications.*; import org.apache.stratos.messaging.event.Event; import org.apache.stratos.messaging.event.applications.*; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import org.apache.stratos.messaging.message.receiver.applications.ApplicationManager; import org.apache.stratos.messaging.util.Constants; import java.util.Set; @@ -21,8 +21,8 @@ public class ApplicationsEventPublisher { public static void sendGroupCreatedEvent(String appId, String groupId) { try { - TopologyManager.acquireReadLockForApplication(appId); - Application application = TopologyManager.getTopology().getApplication(appId); + ApplicationManager.acquireReadLockForApplication(appId); + Application application = ApplicationManager.getApplications().getApplication(appId); if (application != null) { Group group = application.getGroupRecursively(groupId); if (group.isStateTransitionValid(GroupStatus.Created)) { @@ -39,14 +39,14 @@ public class ApplicationsEventPublisher { } } } finally { - TopologyManager.releaseReadLockForApplication(appId); + ApplicationManager.releaseReadLockForApplication(appId); } } public static void sendGroupActivatedEvent(String appId, String groupId) { try { - TopologyManager.acquireReadLockForApplication(appId); - Application application = TopologyManager.getTopology().getApplication(appId); + ApplicationManager.acquireReadLockForApplication(appId); + Application application = ApplicationManager.getApplications().getApplication(appId); if (application != null) { Group group = application.getGroupRecursively(groupId); if (group.isStateTransitionValid(GroupStatus.Active)) { @@ -63,14 +63,14 @@ public class ApplicationsEventPublisher { } } } finally { - TopologyManager.releaseReadLockForApplication(appId); + ApplicationManager.releaseReadLockForApplication(appId); } } public static void sendGroupInActivateEvent(String appId, String groupId) { try { - TopologyManager.acquireReadLockForApplication(appId); - Application application = TopologyManager.getTopology().getApplication(appId); + ApplicationManager.acquireReadLockForApplication(appId); + Application application = ApplicationManager.getApplications().getApplication(appId); if (application != null) { Group group = application.getGroupRecursively(groupId); if (group.isStateTransitionValid(GroupStatus.Inactive)) { @@ -87,14 +87,14 @@ public class ApplicationsEventPublisher { } } } finally { - TopologyManager.releaseReadLockForApplication(appId); + ApplicationManager.releaseReadLockForApplication(appId); } } public static void sendGroupTerminatingEvent(String appId, String groupId) { try { - TopologyManager.acquireReadLockForApplication(appId); - Application application = TopologyManager.getTopology().getApplication(appId); + ApplicationManager.acquireReadLockForApplication(appId); + Application application = ApplicationManager.getApplications().getApplication(appId); if (application != null) { Group group = application.getGroupRecursively(groupId); if (group.isStateTransitionValid(GroupStatus.Terminating)) { @@ -110,7 +110,7 @@ public class ApplicationsEventPublisher { } } } finally { - TopologyManager.releaseReadLockForApplication(appId); + ApplicationManager.releaseReadLockForApplication(appId); } } @@ -122,8 +122,8 @@ public class ApplicationsEventPublisher { } try { - TopologyManager.acquireReadLockForApplication(appId); - Application application = TopologyManager.getTopology().getApplication(appId); + ApplicationManager.acquireReadLockForApplication(appId); + Application application = ApplicationManager.getApplications().getApplication(appId); if (application != null) { Group group = application.getGroupRecursively(groupId); if (group.isStateTransitionValid(GroupStatus.Terminated)) { @@ -135,7 +135,7 @@ public class ApplicationsEventPublisher { } } } finally { - TopologyManager.releaseReadLockForApplication(appId); + ApplicationManager.releaseReadLockForApplication(appId); } @@ -143,8 +143,8 @@ public class ApplicationsEventPublisher { public static void sendApplicationActivatedEvent(String appId) { try { - TopologyManager.acquireReadLockForApplication(appId); - Application application = TopologyManager.getTopology().getApplication(appId); + ApplicationManager.acquireReadLockForApplication(appId); + Application application = ApplicationManager.getApplications().getApplication(appId); if (application != null) { if (application.isStateTransitionValid(ApplicationStatus.Active)) { if (log.isInfoEnabled()) { @@ -159,7 +159,7 @@ public class ApplicationsEventPublisher { } } } finally { - TopologyManager.releaseReadLockForApplication(appId); + ApplicationManager.releaseReadLockForApplication(appId); } } @@ -169,8 +169,8 @@ public class ApplicationsEventPublisher { } try { - TopologyManager.acquireReadLockForApplication(appId); - Application application = TopologyManager.getTopology().getApplication(appId); + ApplicationManager.acquireReadLockForApplication(appId); + Application application = ApplicationManager.getApplications().getApplication(appId); if (application != null) { if (application.isStateTransitionValid(ApplicationStatus.Inactive)) { ApplicationInactivatedEvent applicationInActivatedEvent = @@ -181,14 +181,14 @@ public class ApplicationsEventPublisher { } } } finally { - TopologyManager.releaseReadLockForApplication(appId); + ApplicationManager.releaseReadLockForApplication(appId); } } public static void sendApplicationTerminatingEvent(String appId) { try { - TopologyManager.acquireReadLockForApplication(appId); - Application application = TopologyManager.getTopology().getApplication(appId); + ApplicationManager.acquireReadLockForApplication(appId); + Application application = ApplicationManager.getApplications().getApplication(appId); if (application != null) { if (application.isStateTransitionValid(ApplicationStatus.Terminating)) { if (log.isInfoEnabled()) { @@ -202,14 +202,14 @@ public class ApplicationsEventPublisher { } } } finally { - TopologyManager.releaseReadLockForApplication(appId); + ApplicationManager.releaseReadLockForApplication(appId); } } public static void sendApplicationTerminatedEvent(String appId, Set<ClusterDataHolder> clusterData) { try { - TopologyManager.acquireReadLockForApplication(appId); - Application application = TopologyManager.getTopology().getApplication(appId); + ApplicationManager.acquireReadLockForApplication(appId); + Application application = ApplicationManager.getApplications().getApplication(appId); if (application != null) { if (application.isStateTransitionValid(ApplicationStatus.Terminated)) { if (log.isInfoEnabled()) { @@ -223,7 +223,7 @@ public class ApplicationsEventPublisher { } } } finally { - TopologyManager.releaseReadLockForApplication(appId); + ApplicationManager.releaseReadLockForApplication(appId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java index 18929ed..dcf3a82 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -45,6 +45,7 @@ import org.apache.stratos.messaging.event.Event; import org.apache.stratos.messaging.event.topology.*; import org.apache.stratos.messaging.listener.applications.ApplicationUndeployedEventListener; import org.apache.stratos.messaging.listener.topology.*; +import org.apache.stratos.messaging.message.receiver.applications.ApplicationManager; import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; @@ -103,7 +104,7 @@ public class AutoscalerTopologyEventReceiver implements Runnable { TopologyManager.acquireReadLock(); try { - for (Application application : TopologyManager.getTopology().getApplications()) { + for (Application application : ApplicationManager.getApplications().getApplications().values()) { startApplicationMonitor(application.getUniqueIdentifier()); } @@ -172,8 +173,8 @@ public class AutoscalerTopologyEventReceiver implements Runnable { log.info("[ClusterCreatedEvent] Received: " + event.getClass()); - ClusterResetEvent clusterCreatedEvent = (ClusterResetEvent) event; - String clusterId = clusterCreatedEvent.getClusterId(); + ClusterCreatedEvent clusterCreatedEvent = (ClusterCreatedEvent) event; + String clusterId = clusterCreatedEvent.getCluster().getClusterId(); AbstractClusterMonitor clusterMonitor = (AbstractClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId); http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java index 1e64d78..f693310 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java @@ -47,6 +47,7 @@ import org.apache.stratos.cloud.controller.stub.pojo.Property; import org.apache.stratos.messaging.domain.applications.Application; import org.apache.stratos.messaging.domain.applications.Group; import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.message.receiver.applications.ApplicationManager; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Constants; @@ -102,10 +103,10 @@ public class ApplicationMonitorFactory { throws DependencyBuilderException, TopologyInConsistentException { GroupMonitor groupMonitor; - TopologyManager.acquireReadLockForApplication(appId); + ApplicationManager.acquireReadLockForApplication(appId); try { - Group group = TopologyManager.getTopology().getApplication(appId).getGroupRecursively(context.getId()); + Group group = ApplicationManager.getApplications().getApplication(appId).getGroupRecursively(context.getId()); groupMonitor = new GroupMonitor(group, appId); groupMonitor.setAppId(appId); if(parentMonitor != null) { @@ -126,7 +127,7 @@ public class ApplicationMonitorFactory { } } finally { - TopologyManager.releaseReadLockForApplication(appId); + ApplicationManager.releaseReadLockForApplication(appId); } return groupMonitor; @@ -146,9 +147,9 @@ public class ApplicationMonitorFactory { throws DependencyBuilderException, TopologyInConsistentException { ApplicationMonitor applicationMonitor; - TopologyManager.acquireReadLockForApplication(appId); + ApplicationManager.acquireReadLockForApplication(appId); try { - Application application = TopologyManager.getTopology().getApplication(appId); + Application application = ApplicationManager.getApplications().getApplication(appId); if (application != null) { applicationMonitor = new ApplicationMonitor(application); applicationMonitor.setHasDependent(false); @@ -158,7 +159,7 @@ public class ApplicationMonitorFactory { throw new TopologyInConsistentException(msg); } } finally { - TopologyManager.releaseReadLockForApplication(appId); + ApplicationManager.releaseReadLockForApplication(appId); } return applicationMonitor; http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java index eafa269..5468f6d 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java @@ -30,6 +30,7 @@ import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor; import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor; import org.apache.stratos.messaging.domain.applications.*; import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.message.receiver.applications.ApplicationManager; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import java.util.Map; @@ -98,8 +99,8 @@ public class StatusChecker { if (cluster != null) { try { - TopologyManager.acquireReadLockForApplication(appId); - Application application = TopologyManager.getTopology().getApplication(appId); + ApplicationManager.acquireReadLockForApplication(appId); + Application application = ApplicationManager.getApplications().getApplication(appId); if (!clusterMonitorHasMembers && cluster.getStatus() == ClusterStatus.Terminating) { if (application.getStatus() == ApplicationStatus.Terminating) { @@ -123,7 +124,7 @@ public class StatusChecker { }*/ } } finally { - TopologyManager.releaseReadLockForApplication(appId); + ApplicationManager.releaseReadLockForApplication(appId); } } } @@ -233,22 +234,22 @@ public class StatusChecker { Runnable group = new Runnable() { public void run() { try { - TopologyManager.acquireReadLockForApplication(appId); + ApplicationManager.acquireReadLockForApplication(appId); ParentComponent component; if (groupId.equals(appId)) { //it is an application - component = TopologyManager.getTopology(). + component = ApplicationManager.getApplications(). getApplication(appId); } else { //it is a group - component = TopologyManager.getTopology(). + component = ApplicationManager.getApplications(). getApplication(appId).getGroupRecursively(groupId); } Map<String, ClusterDataHolder> clusterIds = component.getClusterDataMap(); Map<String, Group> groups = component.getAliasToGroupMap(); updateChildStatus(appId, idOfChild, groups, clusterIds, component); } finally { - TopologyManager.releaseReadLockForApplication(appId); + ApplicationManager.releaseReadLockForApplication(appId); } @@ -286,8 +287,8 @@ public class StatusChecker { clusterStatus = getClusterStatus(clusterData); groupStatus = getGroupStatus(groups); try { - TopologyManager.acquireReadLockForApplication(appId); - Application application = TopologyManager.getTopology().getApplication(appId); + ApplicationManager.acquireReadLockForApplication(appId); + Application application = ApplicationManager.getApplications().getApplication(appId); if (groups.isEmpty() && getAllClusterInSameState(clusterData,ClusterStatus.Active) || clusterData.isEmpty() && getAllGroupInSameState(groups, GroupStatus.Active) || @@ -352,7 +353,7 @@ public class StatusChecker { log.warn("Clusters/groups not found in this [component] " + appId); } } finally { - TopologyManager.releaseReadLockForApplication(appId); + ApplicationManager.releaseReadLockForApplication(appId); } http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java index f96ba8e..2e0883b 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java @@ -78,7 +78,7 @@ public class TopologyEventPublisher { } public static void sendClusterCreatedEvent(String appId, String serviceName, String clusterId) { - ClusterCreatedEvent clusterCreatedEvent = new ClusterCreatedEvent(appId,serviceName, clusterId); + ClusterCreatedEvent clusterCreatedEvent = new ClusterCreatedEvent(new Cluster()); if(log.isInfoEnabled()) { log.info("Publishing cluster created event: " +clusterId); http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java index e5a7921..e1feb82 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java @@ -68,6 +68,6 @@ public class Applications implements Serializable { public synchronized void removeApplication (String appId) { this.applicationMap.remove(appId); - ApplicationLockHierarchy.getInstance().removeLock(appId); + ApplicationLockHierarchy.getInstance().removeLockForApplication(appId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java index cc31892..2b457d7 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java @@ -71,11 +71,11 @@ public class ApplicationLockHierarchy { } } - public ApplicationLock getLock (String appId) { + public ApplicationLock getLockForApplication(String appId) { return appIdToApplicationLockMap.get(appId); } - public void removeLock (String appId) { + public void removeLockForApplication (String appId) { if (appIdToApplicationLockMap.remove(appId) != null) { log.info("Removed lock for Application " + appId); } else { http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java index bb6b8fa..e89df3a 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java @@ -35,15 +35,9 @@ public class TopologyLockHierarchy { // lock for Services private final TopologyLock serviceLock; - // lock for Applications - private final TopologyLock applicatioLock; - // key = Service.name private final Map<String, TopologyLock> serviceNameToTopologyLockMap; - // key = Application.id - private final Map<String, TopologyLock> applicationIdToTopologyLockMap; - // key = Cluster.id private final Map<String, TopologyLock> clusterIdToTopologyLockMap; @@ -53,9 +47,7 @@ public class TopologyLockHierarchy { this.completeTopologyLock = new TopologyLock(); this.serviceLock = new TopologyLock(); - this.applicatioLock = new TopologyLock(); this.serviceNameToTopologyLockMap = new ConcurrentHashMap<String, TopologyLock>(); - this.applicationIdToTopologyLockMap = new ConcurrentHashMap<String, TopologyLock>(); this.clusterIdToTopologyLockMap = new ConcurrentHashMap<String, TopologyLock>(); } @@ -72,26 +64,6 @@ public class TopologyLockHierarchy { return topologyLockHierarchy; } - public void addApplicationLock (String appId, final TopologyLock topologyLock) { - - if (!applicationIdToTopologyLockMap.containsKey(appId)) { - synchronized (applicationIdToTopologyLockMap) { - if (!applicationIdToTopologyLockMap.containsKey(appId)) { - applicationIdToTopologyLockMap.put(appId, topologyLock); - log.info("Added lock for Application " + appId); - } - } - } else { - if (log.isDebugEnabled()) { - log.debug("Topology Lock for Application " + appId + " already exists"); - } - } - } - - public TopologyLock getTopologyLockForApplication (String appId) { - return applicationIdToTopologyLockMap.get(appId); - } - public void addServiceLock (String serviceName, final TopologyLock topologyLock) { if (!serviceNameToTopologyLockMap.containsKey(serviceName)) { @@ -132,16 +104,6 @@ public class TopologyLockHierarchy { return clusterIdToTopologyLockMap.get(clusterId); } - public void removeTopologyLockForApplication (String appId) { - if (applicationIdToTopologyLockMap.remove(appId) != null) { - log.info("Removed lock for Application " + appId); - } else { - if (log.isDebugEnabled()) { - log.debug("Lock already removed for Application " + appId); - } - } - } - public void removeTopologyLockForService (String serviceName) { if (serviceNameToTopologyLockMap.remove(serviceName) != null) { log.info("Removed lock for Service " + serviceName); @@ -166,10 +128,6 @@ public class TopologyLockHierarchy { return serviceLock; } - public TopologyLock getApplicatioLock() { - return applicatioLock; - } - public TopologyLock getCompleteTopologyLock() { return completeTopologyLock; } http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java index 6c82925..da75265 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationActivatedMessageProcessor.java @@ -22,9 +22,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.applications.Application; import org.apache.stratos.messaging.domain.applications.ApplicationStatus; +import org.apache.stratos.messaging.domain.applications.Applications; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ApplicationActivatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater; import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; import org.apache.stratos.messaging.util.Util; @@ -46,40 +48,40 @@ public class ApplicationActivatedMessageProcessor extends MessageProcessor { @Override public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; + Applications applications = (Applications) object; if (ApplicationActivatedEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) + // Return if applications has not been initialized + if (!applications.isInitialized()) return false; // Parse complete message and build event ApplicationActivatedEvent event = (ApplicationActivatedEvent) Util. jsonToObject(message, ApplicationActivatedEvent.class); - TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId()); try { - return doProcess(event, topology); + return doProcess(event, applications); } finally { - TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId()); } } else { if (nextProcessor != null) { // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); + return nextProcessor.process(type, message, applications); } else { throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } } } - private boolean doProcess(ApplicationActivatedEvent event, Topology topology) { + private boolean doProcess(ApplicationActivatedEvent event, Applications applications) { - // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); + // Validate event against the existing applications + Application application = applications.getApplication(event.getAppId()); if (application == null) { if (log.isWarnEnabled()) { log.warn(String.format("Application does not exist: [service] %s", @@ -87,7 +89,7 @@ public class ApplicationActivatedMessageProcessor extends MessageProcessor { } return false; } else { - // Apply changes to the topology + // Apply changes to the applications if (!application.isStateTransitionValid(ApplicationStatus.Active)) { log.error("Invalid State transfer from [ " + application.getStatus() + " ] to [ " + ApplicationStatus.Active + " ]"); http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java index db8e3c8..cfe8500 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java @@ -22,15 +22,11 @@ package org.apache.stratos.messaging.message.processor.applications; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.applications.Applications; -import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; -import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.event.applications.ApplicationCreatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; +import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater; import org.apache.stratos.messaging.util.Util; -import java.util.Set; - public class ApplicationCreatedMessageProcessor extends MessageProcessor { private static final Log log = LogFactory.getLog(ApplicationCreatedMessageProcessor.class); @@ -57,12 +53,12 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor { return false; } - TopologyUpdater.acquireWriteLockForApplications(); + ApplicationsUpdater.acquireWriteLockForApplications(); try { return doProcess(event, applications); } finally { - TopologyUpdater.releaseWriteLockForApplications(); + ApplicationsUpdater.releaseWriteLockForApplications(); } } else { @@ -75,7 +71,7 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor { } } - private boolean doProcess(ApplicationCreatedEvent event, Applications topology) { + private boolean doProcess(ApplicationCreatedEvent event, Applications applications) { // check if required properties are available if (event.getApplication() == null) { @@ -90,13 +86,13 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor { throw new RuntimeException(errorMsg); } - // check if an Application with same name exists in topology - if (topology.applicationExists(event.getApplication().getUniqueIdentifier())) { + // check if an Application with same name exists in applications + if (applications.applicationExists(event.getApplication().getUniqueIdentifier())) { log.warn("Application with id [ " + event.getApplication().getUniqueIdentifier() + " ] already exists in Topology"); } else { // add application and the clusters to Topology - topology.addApplication(event.getApplication()); + applications.addApplication(event.getApplication()); } notifyEventListeners(event); http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java index 91eae8c..e97e3fc 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java @@ -22,9 +22,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.applications.Application; import org.apache.stratos.messaging.domain.applications.ApplicationStatus; +import org.apache.stratos.messaging.domain.applications.Applications; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ApplicationInactivatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater; import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; import org.apache.stratos.messaging.util.Util; @@ -46,40 +48,40 @@ public class ApplicationInactivatedMessageProcessor extends MessageProcessor { @Override public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; + Applications applications = (Applications) object; if (ApplicationInactivatedEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) + // Return if applications has not been initialized + if (!applications.isInitialized()) return false; // Parse complete message and build event ApplicationInactivatedEvent event = (ApplicationInactivatedEvent) Util. jsonToObject(message, ApplicationInactivatedEvent.class); - TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId()); try { - return doProcess(event, topology); + return doProcess(event, applications); } finally { - TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId()); } } else { if (nextProcessor != null) { // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); + return nextProcessor.process(type, message, applications); } else { throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } } } - private boolean doProcess (ApplicationInactivatedEvent event, Topology topology) { + private boolean doProcess (ApplicationInactivatedEvent event, Applications applications) { - // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); + // Validate event against the existing applications + Application application = applications.getApplication(event.getAppId()); if (application == null) { if (log.isWarnEnabled()) { log.warn(String.format("Application does not exist: [service] %s", @@ -87,7 +89,7 @@ public class ApplicationInactivatedMessageProcessor extends MessageProcessor { } return false; } else { - // Apply changes to the topology + // Apply changes to the applications if (!application.isStateTransitionValid(ApplicationStatus.Inactive)) { log.error("Invalid State transfer from [ " + application.getStatus() + " ] to [ " + ApplicationStatus.Inactive + " ]"); http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java index 8cd2182..99d08fe 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java @@ -20,10 +20,12 @@ package org.apache.stratos.messaging.message.processor.applications; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.applications.Applications; import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; import org.apache.stratos.messaging.domain.topology.*; import org.apache.stratos.messaging.event.topology.ApplicationTerminatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater; import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; import org.apache.stratos.messaging.util.Util; @@ -47,18 +49,18 @@ public class ApplicationTerminatedMessageProcessor extends MessageProcessor { @Override public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; + Applications applications = (Applications) object; if (ApplicationTerminatedEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) + // Return if applications has not been initialized + if (!applications.isInitialized()) return false; // Parse complete message and build event ApplicationTerminatedEvent event = (ApplicationTerminatedEvent) Util. jsonToObject(message, ApplicationTerminatedEvent.class); - TopologyUpdater.acquireWriteLockForApplications(); + ApplicationsUpdater.acquireWriteLockForApplications(); Set<ClusterDataHolder> clusterDataHolders = event.getClusterData(); if (clusterDataHolders != null) { for (ClusterDataHolder clusterData : clusterDataHolders) { @@ -67,10 +69,10 @@ public class ApplicationTerminatedMessageProcessor extends MessageProcessor { } try { - return doProcess(event, topology); + return doProcess(event, applications); } finally { - TopologyUpdater.releaseWriteLockForApplications(); + ApplicationsUpdater.releaseWriteLockForApplications(); if (clusterDataHolders != null) { for (ClusterDataHolder clusterData : clusterDataHolders) { TopologyUpdater.releaseWriteLockForService(clusterData.getServiceType()); @@ -81,14 +83,14 @@ public class ApplicationTerminatedMessageProcessor extends MessageProcessor { } else { if (nextProcessor != null) { // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); + return nextProcessor.process(type, message, applications); } else { throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } } } - private boolean doProcess (ApplicationTerminatedEvent event, Topology topology) { + private boolean doProcess (ApplicationTerminatedEvent event, Applications applications) { // check if required properties are available if (event.getAppId() == null) { @@ -103,25 +105,26 @@ public class ApplicationTerminatedMessageProcessor extends MessageProcessor { throw new RuntimeException(errorMsg); } - // check if an Application with same name exists in topology + // check if an Application with same name exists in applications String appId = event.getAppId(); - if (topology.applicationExists(appId)) { + if (applications.applicationExists(appId)) { log.warn("Application with id [ " + appId + " ] still exists in Topology, removing it"); - topology.removeApplication(appId); + applications.removeApplication(appId); } if (event.getClusterData() != null) { // remove the Clusters from the Topology for (ClusterDataHolder clusterData : event.getClusterData()) { - Service service = topology.getService(clusterData.getServiceType()); - if (service != null) { - service.removeCluster(clusterData.getClusterId()); - if (log.isDebugEnabled()) { - log.debug("Removed the Cluster " + clusterData.getClusterId() + " from Topology"); - } - } else { - log.warn("Service " + clusterData.getServiceType() + " not found in Topology!"); - } + log.info("################################ TODO ################################"); +// Service service = applications.getService(clusterData.getServiceType()); +// if (service != null) { +// service.removeCluster(clusterData.getClusterId()); +// if (log.isDebugEnabled()) { +// log.debug("Removed the Cluster " + clusterData.getClusterId() + " from Topology"); +// } +// } else { +// log.warn("Service " + clusterData.getServiceType() + " not found in Topology!"); +// } } } http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java index 057d013..633e080 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java @@ -22,9 +22,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.applications.Application; import org.apache.stratos.messaging.domain.applications.ApplicationStatus; +import org.apache.stratos.messaging.domain.applications.Applications; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.ApplicationTerminatingEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater; import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; import org.apache.stratos.messaging.util.Util; @@ -46,40 +48,40 @@ public class ApplicationTerminatingMessageProcessor extends MessageProcessor { @Override public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; + Applications applications = (Applications) object; if (ApplicationTerminatingEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) + // Return if applications has not been initialized + if (!applications.isInitialized()) return false; // Parse complete message and build event ApplicationTerminatingEvent event = (ApplicationTerminatingEvent) Util. jsonToObject(message, ApplicationTerminatingEvent.class); - TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId()); try { - return doProcess(event, topology); + return doProcess(event, applications); } finally { - TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId()); } } else { if (nextProcessor != null) { // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); + return nextProcessor.process(type, message, applications); } else { throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } } } - private boolean doProcess (ApplicationTerminatingEvent event, Topology topology) { + private boolean doProcess (ApplicationTerminatingEvent event, Applications applications) { - // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); + // Validate event against the existing applications + Application application = applications.getApplication(event.getAppId()); if (application == null) { if (log.isWarnEnabled()) { log.warn(String.format("Application does not exist: [service] %s", @@ -87,7 +89,7 @@ public class ApplicationTerminatingMessageProcessor extends MessageProcessor { } return false; } else { - // Apply changes to the topology + // Apply changes to the applications if (!application.isStateTransitionValid(ApplicationStatus.Terminating)) { log.error("Invalid State transfer from [ " + application.getStatus() + " ] to [ " + ApplicationStatus.Terminating + " ]"); http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java index 7e91ab8..e911b77 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java @@ -23,10 +23,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.applications.Application; import org.apache.stratos.messaging.domain.applications.ApplicationStatus; +import org.apache.stratos.messaging.domain.applications.Applications; import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; import org.apache.stratos.messaging.domain.topology.*; import org.apache.stratos.messaging.event.topology.ApplicationUndeployedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater; import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; import org.apache.stratos.messaging.util.Util; @@ -46,10 +48,10 @@ public class ApplicationUndeployedMessageProcessor extends MessageProcessor { @Override public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; + Applications applications = (Applications) object; if (ApplicationUndeployedEvent.class.getName().equals(type)) { - if (!topology.isInitialized()) { + if (!applications.isInitialized()) { return false; } @@ -61,7 +63,7 @@ public class ApplicationUndeployedMessageProcessor extends MessageProcessor { } // get write lock for the application and relevant Clusters - TopologyUpdater.acquireWriteLockForApplication(event.getApplicationId()); + ApplicationsUpdater.acquireWriteLockForApplication(event.getApplicationId()); Set<ClusterDataHolder> clusterDataHolders = event.getClusterData(); if (clusterDataHolders != null) { for (ClusterDataHolder clusterData : clusterDataHolders) { @@ -71,7 +73,7 @@ public class ApplicationUndeployedMessageProcessor extends MessageProcessor { } try { - return doProcess(event, topology); + return doProcess(event, applications); } finally { // remove locks @@ -81,13 +83,13 @@ public class ApplicationUndeployedMessageProcessor extends MessageProcessor { clusterData.getClusterId()); } } - TopologyUpdater.releaseWriteLockForApplication(event.getApplicationId()); + ApplicationsUpdater.releaseWriteLockForApplication(event.getApplicationId()); } } else { if (nextProcessor != null) { // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); + return nextProcessor.process(type, message, applications); } else { throw new RuntimeException(String.format ("Failed to process message using available message processors: [type] %s [body] %s", type, message)); @@ -95,10 +97,10 @@ public class ApplicationUndeployedMessageProcessor extends MessageProcessor { } } - private boolean doProcess (ApplicationUndeployedEvent event, Topology topology) { + private boolean doProcess (ApplicationUndeployedEvent event, Applications applications) { // update the application status to Terminating - Application application = topology.getApplication(event.getApplicationId()); + Application application = applications.getApplication(event.getApplicationId()); // check and update application status to 'Terminating' if (!application.isStateTransitionValid(ApplicationStatus.Terminating)) { log.error("Invalid state transfer from " + application.getStatus() + " to " + ApplicationStatus.Terminating); @@ -110,27 +112,28 @@ public class ApplicationUndeployedMessageProcessor extends MessageProcessor { Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively(); // update the Cluster statuses to Terminating for (ClusterDataHolder clusterDataHolder : clusterData) { - Service service = topology.getService(clusterDataHolder.getServiceType()); - if (service != null) { - Cluster aCluster = service.getCluster(clusterDataHolder.getClusterId()); - if (aCluster != null) { - // validate state transition - if (!aCluster.isStateTransitionValid(ClusterStatus.Terminating)) { - log.error("Invalid state transfer from " + aCluster.getStatus() + " to " - + ClusterStatus.Terminating); - } - // for now anyway update the status forcefully - aCluster.setStatus(ClusterStatus.Terminating); - - } else { - log.warn("Unable to find Cluster with cluster id " + clusterDataHolder.getClusterId() + - " in Topology"); - } - - } else { - log.warn("Unable to remove cluster with cluster id: " + clusterDataHolder.getClusterId() + " from Topology, " + - " associated Service [ " + clusterDataHolder.getServiceType() + " ] npt found"); - } + log.info("############################### TODO ###############################"); +// Service service = applications.getService(clusterDataHolder.getServiceType()); +// if (service != null) { +// Cluster aCluster = service.getCluster(clusterDataHolder.getClusterId()); +// if (aCluster != null) { +// // validate state transition +// if (!aCluster.isStateTransitionValid(ClusterStatus.Terminating)) { +// log.error("Invalid state transfer from " + aCluster.getStatus() + " to " +// + ClusterStatus.Terminating); +// } +// // for now anyway update the status forcefully +// aCluster.setStatus(ClusterStatus.Terminating); +// +// } else { +// log.warn("Unable to find Cluster with cluster id " + clusterDataHolder.getClusterId() + +// " in Topology"); +// } +// +// } else { +// log.warn("Unable to remove cluster with cluster id: " + clusterDataHolder.getClusterId() + " from Topology, " + +// " associated Service [ " + clusterDataHolder.getServiceType() + " ] npt found"); +// } } notifyEventListeners(event); http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/CompleteApplicationsMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/CompleteApplicationsMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/CompleteApplicationsMessageProcessor.java index 53c469b..c9af67a 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/CompleteApplicationsMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/CompleteApplicationsMessageProcessor.java @@ -22,25 +22,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.applications.Application; import org.apache.stratos.messaging.domain.applications.Applications; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.Member; -import org.apache.stratos.messaging.domain.topology.Service; -import org.apache.stratos.messaging.domain.topology.Topology; -import org.apache.stratos.messaging.domain.topology.locking.TopologyLock; -import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy; import org.apache.stratos.messaging.event.applications.CompleteApplicationsEvent; -import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; -import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; -import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; -import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.processor.MessageProcessor; import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater; -import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; import org.apache.stratos.messaging.util.Util; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; public class CompleteApplicationsMessageProcessor extends MessageProcessor { @@ -62,13 +49,13 @@ public class CompleteApplicationsMessageProcessor extends MessageProcessor { jsonToObject(message, CompleteApplicationsEvent.class); if (!applications.isInitialized()) { - ApplicationsUpdater.acquireWriteLock(); + ApplicationsUpdater.acquireWriteLockForApplications(); try { doProcess(event, applications); } finally { - ApplicationsUpdater.releaseWriteLock(); + ApplicationsUpdater.releaseWriteLockForApplications(); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java index 845e933..5c8d477 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java @@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.processor.applications; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.applications.Application; +import org.apache.stratos.messaging.domain.applications.Applications; import org.apache.stratos.messaging.domain.applications.Group; import org.apache.stratos.messaging.domain.applications.GroupStatus; import org.apache.stratos.messaging.domain.topology.*; import org.apache.stratos.messaging.event.topology.GroupActivatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater; import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; import org.apache.stratos.messaging.util.Util; @@ -43,40 +45,40 @@ public class GroupActivatedProcessor extends MessageProcessor { @Override public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; + Applications applications = (Applications) object; if (GroupActivatedEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) + // Return if applications has not been initialized + if (!applications.isInitialized()) return false; // Parse complete message and build event GroupActivatedEvent event = (GroupActivatedEvent) Util. jsonToObject(message, GroupActivatedEvent.class); - TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId()); try { - return doProcess(event, topology); + return doProcess(event, applications); } finally { - TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId()); } } else { if (nextProcessor != null) { // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); + return nextProcessor.process(type, message, applications); } else { throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } } } - private boolean doProcess (GroupActivatedEvent event,Topology topology) { + private boolean doProcess (GroupActivatedEvent event, Applications applications) { // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); + Application application = applications.getApplication(event.getAppId()); if (application == null) { if (log.isWarnEnabled()) { log.warn(String.format("Application does not exist: [service] %s", http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java index 47d4457..67861e4 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java @@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.processor.applications; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.applications.Application; +import org.apache.stratos.messaging.domain.applications.Applications; import org.apache.stratos.messaging.domain.applications.Group; import org.apache.stratos.messaging.domain.applications.GroupStatus; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.GroupCreatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater; import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; import org.apache.stratos.messaging.util.Util; @@ -43,40 +45,40 @@ public class GroupCreatedProcessor extends MessageProcessor { @Override public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; + Applications applications = (Applications) object; if (GroupCreatedEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) + // Return if applications has not been initialized + if (!applications.isInitialized()) return false; // Parse complete message and build event GroupCreatedEvent event = (GroupCreatedEvent) Util. jsonToObject(message, GroupCreatedEvent.class); - TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId()); try { - return doProcess(event, topology); + return doProcess(event, applications); } finally { - TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId()); } } else { if (nextProcessor != null) { // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); + return nextProcessor.process(type, message, applications); } else { throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } } } - private boolean doProcess (GroupCreatedEvent event,Topology topology) { + private boolean doProcess (GroupCreatedEvent event,Applications applications) { - // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); + // Validate event against the existing applications + Application application = applications.getApplication(event.getAppId()); if (application == null) { if (log.isWarnEnabled()) { log.warn(String.format("Application does not exist: [service] %s", @@ -92,7 +94,7 @@ public class GroupCreatedProcessor extends MessageProcessor { event.getGroupId())); } } else { - // Apply changes to the topology + // Apply changes to the applications if (!group.isStateTransitionValid(GroupStatus.Created)) { log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Created + " " + "for Group " + group.getAlias()); http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java index 063a3de..4f2e581 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java @@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.processor.applications; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.applications.Application; +import org.apache.stratos.messaging.domain.applications.Applications; import org.apache.stratos.messaging.domain.applications.Group; import org.apache.stratos.messaging.domain.applications.GroupStatus; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.GroupInactivateEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater; import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; import org.apache.stratos.messaging.util.Util; @@ -43,40 +45,40 @@ public class GroupInActivateProcessor extends MessageProcessor { @Override public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; + Applications applications = (Applications) object; if (GroupInactivateEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) + // Return if applications has not been initialized + if (!applications.isInitialized()) return false; // Parse complete message and build event GroupInactivateEvent event = (GroupInactivateEvent) Util. jsonToObject(message, GroupInactivateEvent.class); - TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId()); try { - return doProcess(event, topology); + return doProcess(event, applications); } finally { - TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId()); } } else { if (nextProcessor != null) { // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); + return nextProcessor.process(type, message, applications); } else { throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } } } - private boolean doProcess(GroupInactivateEvent event, Topology topology) { + private boolean doProcess(GroupInactivateEvent event, Applications applications) { - // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); + // Validate event against the existing applications + Application application = applications.getApplication(event.getAppId()); if (application == null) { if (log.isWarnEnabled()) { log.warn(String.format("Application does not exist: [service] %s", http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java index 3de0914..6e985b7 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java @@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.processor.applications; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.applications.Application; +import org.apache.stratos.messaging.domain.applications.Applications; import org.apache.stratos.messaging.domain.applications.Group; import org.apache.stratos.messaging.domain.applications.GroupStatus; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.GroupTerminatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater; import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; import org.apache.stratos.messaging.util.Util; @@ -43,40 +45,40 @@ public class GroupTerminatedProcessor extends MessageProcessor { @Override public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; + Applications applications = (Applications) object; if (GroupTerminatedEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) + // Return if applications has not been initialized + if (!applications.isInitialized()) return false; // Parse complete message and build event GroupTerminatedEvent event = (GroupTerminatedEvent) Util. jsonToObject(message, GroupTerminatedEvent.class); - TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId()); try { - return doProcess(event, topology); + return doProcess(event, applications); } finally { - TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId()); } } else { if (nextProcessor != null) { // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); + return nextProcessor.process(type, message, applications); } else { throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } } } - private boolean doProcess (GroupTerminatedEvent event,Topology topology) { + private boolean doProcess (GroupTerminatedEvent event, Applications applications) { - // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); + // Validate event against the existing applications + Application application = applications.getApplication(event.getAppId()); if (application == null) { if (log.isWarnEnabled()) { log.warn(String.format("Application does not exist: [service] %s", @@ -92,7 +94,7 @@ public class GroupTerminatedProcessor extends MessageProcessor { event.getGroupId())); } } else { - // Apply changes to the topology + // Apply changes to the applications if (!group.isStateTransitionValid(GroupStatus.Terminated)) { log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Terminated); } http://git-wip-us.apache.org/repos/asf/stratos/blob/c42d7c1d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java index e124b7b..c9a136f 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java @@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.processor.applications; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.applications.Application; +import org.apache.stratos.messaging.domain.applications.Applications; import org.apache.stratos.messaging.domain.applications.Group; import org.apache.stratos.messaging.domain.applications.GroupStatus; import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.event.topology.GroupTerminatingEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater; import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; import org.apache.stratos.messaging.util.Util; @@ -43,40 +45,40 @@ public class GroupTerminatingProcessor extends MessageProcessor { @Override public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; + Applications applications = (Applications) object; if (GroupTerminatingEvent.class.getName().equals(type)) { - // Return if topology has not been initialized - if (!topology.isInitialized()) + // Return if applications has not been initialized + if (!applications.isInitialized()) return false; // Parse complete message and build event GroupTerminatingEvent event = (GroupTerminatingEvent) Util. jsonToObject(message, GroupTerminatingEvent.class); - TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + ApplicationsUpdater.acquireWriteLockForApplication(event.getAppId()); try { - return doProcess(event, topology); + return doProcess(event, applications); } finally { - TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + ApplicationsUpdater.releaseWriteLockForApplication(event.getAppId()); } } else { if (nextProcessor != null) { // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); + return nextProcessor.process(type, message, applications); } else { throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } } } - private boolean doProcess (GroupTerminatingEvent event,Topology topology) { + private boolean doProcess (GroupTerminatingEvent event,Applications applications) { - // Validate event against the existing topology - Application application = topology.getApplication(event.getAppId()); + // Validate event against the existing applications + Application application = applications.getApplication(event.getAppId()); if (application == null) { if (log.isWarnEnabled()) { log.warn(String.format("Application does not exist: [service] %s", @@ -92,7 +94,7 @@ public class GroupTerminatingProcessor extends MessageProcessor { event.getGroupId())); } } else { - // Apply changes to the topology + // Apply changes to the applications if (!group.isStateTransitionValid(GroupStatus.Terminating)) { log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active); }
