Repository: stratos Updated Branches: refs/heads/4.0.0-grouping 25221cc3f -> fe38bdcfc
adding cluster and group in-active listeners for topology and application status in the receiver side Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/fe38bdcf Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/fe38bdcf Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/fe38bdcf Branch: refs/heads/4.0.0-grouping Commit: fe38bdcfc5f761ef46e78a5db3c0e7f34141f8d9 Parents: 25221cc Author: reka <[email protected]> Authored: Fri Oct 24 15:48:07 2014 +0530 Committer: reka <[email protected]> Committed: Fri Oct 24 15:49:15 2014 +0530 ---------------------------------------------------------------------- .../grouping/topic/StatusEventPublisher.java | 72 +++++-------------- .../AutoscalerTopologyEventReceiver.java | 14 ---- .../status/checker/StatusChecker.java | 16 ++++- .../ApplicationStatusTopicReceiver.java | 16 +++++ .../controller/topology/TopologyBuilder.java | 74 ++++++++++++++++++++ .../topology/TopologyEventPublisher.java | 17 +++++ 6 files changed, 141 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/fe38bdcf/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/StatusEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/StatusEventPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/StatusEventPublisher.java index 668161a..630ffd2 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/StatusEventPublisher.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/StatusEventPublisher.java @@ -11,6 +11,7 @@ import org.apache.stratos.messaging.event.application.status.ApplicationActivate import org.apache.stratos.messaging.event.application.status.ApplicationInactivatedEvent; import org.apache.stratos.messaging.event.application.status.ApplicationTerminatedEvent; import org.apache.stratos.messaging.event.application.status.ClusterActivatedEvent; +import org.apache.stratos.messaging.event.application.status.ClusterInActivateEvent; import org.apache.stratos.messaging.event.application.status.ClusterMaintenanceModeEvent; import org.apache.stratos.messaging.event.application.status.GroupActivatedEvent; import org.apache.stratos.messaging.event.topology.*; @@ -32,10 +33,10 @@ public class StatusEventPublisher { " [cluster]: " + clusterId); } - //TODO cluster - ClusterCreatedEvent clusterActivatedEvent = new ClusterCreatedEvent(appId, serviceName, null); - publishEvent(clusterActivatedEvent); + ClusterCreatedEvent clusterCreatedEvent = new ClusterCreatedEvent(appId, serviceName, clusterId); + + publishEvent(clusterCreatedEvent); } public static void sendClusterActivatedEvent(String appId, String serviceName, String clusterId) { @@ -45,7 +46,8 @@ public class StatusEventPublisher { " [cluster]: " + clusterId); } - ClusterActivatedEvent clusterActivatedEvent = new ClusterActivatedEvent(appId, serviceName, clusterId); + ClusterActivatedEvent clusterActivatedEvent = + new ClusterActivatedEvent(appId, serviceName, clusterId); publishEvent(clusterActivatedEvent); } @@ -57,9 +59,10 @@ public class StatusEventPublisher { " [cluster]: " + clusterId); } - /*ClusterActivatedEvent clusterActivatedEvent = new ClusterActivatedEvent(appId, serviceName, clusterId); + ClusterInActivateEvent clusterInActivateEvent = + new ClusterInActivateEvent(appId, serviceName, clusterId); - publishEvent(clusterActivatedEvent);*/ + publishEvent(clusterInActivateEvent); } public static void sendClusterTerminatingEvent(String appId, String serviceName, String clusterId) { @@ -68,7 +71,7 @@ public class StatusEventPublisher { log.info("Publishing Cluster in-activate event for [application]: " + appId + " [cluster]: " + clusterId); } - + //TODO /*ClusterActivatedEvent clusterActivatedEvent = new ClusterActivatedEvent(appId, serviceName, clusterId); publishEvent(clusterActivatedEvent);*/ @@ -80,36 +83,12 @@ public class StatusEventPublisher { log.info("Publishing Cluster in-activate event for [application]: " + appId + " [cluster]: " + clusterId); } - - /*ClusterActivatedEvent clusterActivatedEvent = new ClusterActivatedEvent(appId, serviceName, clusterId); + //TODO + /* Cluster clusterActivatedEvent = new ClusterActivatedEvent(appId, serviceName, clusterId); publishEvent(clusterActivatedEvent);*/ } - public static void sendClusterInMaintenanceEvent(String appId, String serviceName, String clusterId) { - - if (log.isInfoEnabled()) { - log.info("Publishing Cluster in_maintenance event for [application]: " + appId + - " [cluster]: " + clusterId); - } - - ClusterMaintenanceModeEvent clusterInMaintenanceEvent = - new ClusterMaintenanceModeEvent(appId, serviceName, clusterId); - - publishEvent(clusterInMaintenanceEvent); - } - - public static void sendGroupCreatedEvent(String appId, String groupId) { - - if (log.isInfoEnabled()) { - log.info("Publishing Group activated event for [application]: " + appId + - " [group]: " + groupId); - } - -/* - publishEvent(groupActivatedEvent);*/ - } - public static void sendGroupActivatedEvent(String appId, String groupId) { if (log.isInfoEnabled()) { @@ -141,9 +120,9 @@ public class StatusEventPublisher { " [group]: " + groupId); } - GroupInActivateEvent groupInActivateEvent = new GroupInActivateEvent(appId, groupId); + GroupInTerminatingEvent groupInTerminatingEvent = new GroupInTerminatingEvent(appId, groupId); - publishEvent(groupInActivateEvent); + publishEvent(groupInTerminatingEvent); } public static void sendGroupTerminatedEvent(String appId, String groupId) { @@ -153,9 +132,9 @@ public class StatusEventPublisher { " [group]: " + groupId); } - GroupInActivateEvent groupInActivateEvent = new GroupInActivateEvent(appId, groupId); + GroupInTerminatedEvent groupInTerminatedEvent = new GroupInTerminatedEvent(appId, groupId); - publishEvent(groupInActivateEvent); + publishEvent(groupInTerminatedEvent); } public static void sendApplicationActivatedEvent(String appId) { @@ -179,27 +158,14 @@ public class StatusEventPublisher { publishEvent(applicationInActivatedEvent); } - public static void sendGroupInMaintenanceEvent(String appId, String groupId) { - + public static void sendApplicationTerminatedEvent (String appId, Set<ClusterDataHolder> clusterData) { if (log.isInfoEnabled()) { - log.info("Publishing Group in_maintenance event for [application]: " + appId + - " [group]: " + groupId); + log.info("Publishing Application terminated event for [application]: " + appId); } - GroupMaintenanceModeEvent groupMaintenanceModeEvent = - new GroupMaintenanceModeEvent(appId, groupId); - - publishEvent(groupMaintenanceModeEvent); - } - - public static void sendApplicationTerminatingEvent (String appId) { - // TODO: implement - } - - public static void sendApplicationTerminatedEvent (String appId, Set<ClusterDataHolder> clusterData) { - ApplicationTerminatedEvent applicationTerminatedEvent = new ApplicationTerminatedEvent(appId, clusterData); + publishEvent(applicationTerminatedEvent); } http://git-wip-us.apache.org/repos/asf/stratos/blob/fe38bdcf/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java index 211bcd2..ce5aff0 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -155,8 +155,6 @@ public class AutoscalerTopologyEventReceiver implements Runnable { //changing the status in the monitor, will notify its parent monitor clusterMonitor.setStatus(ClusterStatus.Active); - //starting the status checker to decide on the status of it's parent - //StatusChecker.getInstance().onClusterStatusChange(clusterId, appId); } }); @@ -174,8 +172,6 @@ public class AutoscalerTopologyEventReceiver implements Runnable { //changing the status in the monitor, will notify its parent monitor clusterMonitor.setStatus(ClusterStatus.Created); - //starting the status checker to decide on the status of it's parent - //StatusChecker.getInstance().onClusterStatusChange(clusterId, appId); } }); @@ -194,8 +190,6 @@ public class AutoscalerTopologyEventReceiver implements Runnable { //changing the status in the monitor, will notify its parent monitor clusterMonitor.setStatus(ClusterStatus.Inactive); - //starting the status checker to decide on the status of it's parent - //StatusChecker.getInstance().onClusterStatusChange(clusterId, appId); } }); @@ -215,8 +209,6 @@ public class AutoscalerTopologyEventReceiver implements Runnable { //changing the status in the monitor, will notify its parent monitor monitor.setStatus(GroupStatus.Active); - //starting the status checker to decide on the status of it's parent - //StatusChecker.getInstance().onGroupStatusChange(groupId, appId); } }); @@ -236,8 +228,6 @@ public class AutoscalerTopologyEventReceiver implements Runnable { //changing the status in the monitor, will notify its parent monitor monitor.setStatus(GroupStatus.Inactive); - //starting the status checker to decide on the status of it's parent - //StatusChecker.getInstance().onGroupStatusChange(groupId, appId); } }); @@ -257,8 +247,6 @@ public class AutoscalerTopologyEventReceiver implements Runnable { //changing the status in the monitor, will notify its parent monitor monitor.setStatus(GroupStatus.Terminating); - //starting the status checker to decide on the status of it's parent - //StatusChecker.getInstance().onGroupStatusChange(groupId, appId); } }); @@ -278,8 +266,6 @@ public class AutoscalerTopologyEventReceiver implements Runnable { //changing the status in the monitor, will notify its parent monitor monitor.setStatus(GroupStatus.Terminated); - //starting the status checker to decide on the status of it's parent - //StatusChecker.getInstance().onGroupStatusChange(groupId, appId); } }); http://git-wip-us.apache.org/repos/asf/stratos/blob/fe38bdcf/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java index 1366bf8..7aef08e 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java @@ -285,7 +285,21 @@ public class StatusChecker { } else if (parent instanceof Group) { //send activation to the parent log.info("sending group terminated : " + parent.getUniqueIdentifier()); - //StatusEventPublisher.sendGroupInActivateEvent(appId, parent.getUniqueIdentifier()); + StatusEventPublisher.sendGroupTerminatedEvent(appId, parent.getUniqueIdentifier()); + } + } else if (groups.isEmpty() && clusterStatus == ClusterStatus.Terminating || + clusterData.isEmpty() && groupStatus == GroupStatus.Terminating || + groupStatus == GroupStatus.Terminating && clusterStatus == ClusterStatus.Terminating) { + //send the terminated event + if (parent instanceof Application) { + //send application activated event + log.info("sending app terminating: " + appId); + StatusEventPublisher.sendApplicationTerminatedEvent(appId, parent.getClusterDataRecursively()); + //StatusEventPublisher.sendApp(appId); + } else if (parent instanceof Group) { + //send activation to the parent + log.info("sending group terminating : " + parent.getUniqueIdentifier()); + StatusEventPublisher.sendGroupTerminatedEvent(appId, parent.getUniqueIdentifier()); } } else { log.warn("Clusters/groups not found in this [component] " + appId); http://git-wip-us.apache.org/repos/asf/stratos/blob/fe38bdcf/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ApplicationStatusTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ApplicationStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ApplicationStatusTopicReceiver.java index 3a3be45..604602b 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ApplicationStatusTopicReceiver.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ApplicationStatusTopicReceiver.java @@ -21,6 +21,7 @@ package org.apache.stratos.cloud.controller.application.status.receiver; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.cloud.controller.topology.TopologyBuilder; +import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.event.Event; import org.apache.stratos.messaging.event.application.status.*; import org.apache.stratos.messaging.listener.application.status.*; @@ -71,6 +72,13 @@ public class ApplicationStatusTopicReceiver implements Runnable { } }); + statusEventReceiver.addEventListener(new ClusterInActivateEventListener() { + @Override + protected void onEvent(Event event) { + TopologyBuilder.handleClusterInActivateEvent((ClusterInActivateEvent) event); + } + }); + statusEventReceiver.addEventListener(new GroupActivatedEventListener() { @Override protected void onEvent(Event event) { @@ -79,6 +87,14 @@ public class ApplicationStatusTopicReceiver implements Runnable { } }); + statusEventReceiver.addEventListener(new GroupInactivateEventListener() { + @Override + protected void onEvent(Event event) { + TopologyBuilder.handleGroupInActiveEvent((GroupInactivateEvent) event); + + } + }); + statusEventReceiver.addEventListener(new GroupTerminatedEventListener() { @Override protected void onEvent(Event event) { http://git-wip-us.apache.org/repos/asf/stratos/blob/fe38bdcf/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java index 9b21c55..484cf99 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java @@ -44,6 +44,7 @@ import org.apache.stratos.messaging.event.instance.status.InstanceMaintenanceMod import org.apache.stratos.messaging.event.instance.status.InstanceReadyToShutdownEvent; import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent; import org.apache.stratos.messaging.event.topology.*; +import org.apache.stratos.messaging.event.topology.ClusterInActivateEvent; import org.wso2.carbon.registry.core.exceptions.RegistryException; import java.util.*; @@ -774,6 +775,44 @@ public class TopologyBuilder { TopologyEventPublisher.sendClusterActivatedEvent(clusterActivatedEvent1); } + public static void handleClusterInActivateEvent( + org.apache.stratos.messaging.event.application.status.ClusterInActivateEvent clusterInActivateEvent) { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(clusterInActivateEvent.getServiceName()); + //update the status of the cluster + if (service == null) { + log.warn(String.format("Service %s does not exist", + clusterInActivateEvent.getServiceName())); + return; + } + + Cluster cluster = service.getCluster(clusterInActivateEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", + clusterInActivateEvent.getClusterId())); + return; + } + + ClusterInActivateEvent clusterActivatedEvent1 = + new ClusterInActivateEvent( + clusterInActivateEvent.getAppId(), + clusterInActivateEvent.getServiceName(), + clusterInActivateEvent.getClusterId()); + try { + TopologyManager.acquireWriteLock(); + //cluster.setStatus(Status.Activated); + cluster.setStatus(ClusterStatus.Inactive); + + log.info("Cluster in-active adding status started"); + + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + //publishing data + TopologyEventPublisher.sendClusterInActivateEvent(clusterActivatedEvent1); + } + public static void handleGroupActivatedEvent(GroupActivatedEvent groupActivatedEvent) { Topology topology = TopologyManager.getTopology(); Application application = topology.getApplication(groupActivatedEvent.getAppId()); @@ -1003,6 +1042,41 @@ public class TopologyBuilder { } } + public static void handleGroupInActiveEvent(GroupInactivateEvent event) { + Topology topology = TopologyManager.getTopology(); + Application application = topology.getApplication(event.getAppId()); + //update the status of the Group + if (application == null) { + log.warn(String.format("Application %s does not exist", + event.getAppId())); + return; + } + + Group group = application.getGroupRecursively(event.getGroupId()); + if (group == null) { + log.warn(String.format("Group %s does not exist", + event.getGroupId())); + return; + } + + GroupInActivateEvent groupInActivateEvent = + new GroupInActivateEvent( + event.getAppId(), + event.getGroupId()); + try { + TopologyManager.acquireWriteLock(); + group.setStatus(GroupStatus.Inactive); + log.info("Group in-active adding status started"); + + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + //publishing data + TopologyEventPublisher.sendGroupInActiveEvent(groupInActivateEvent); + } + + public static void handleGroupTerminatedEvent(GroupInTerminatedEvent event) { Topology topology = TopologyManager.getTopology(); Application application = topology.getApplication(event.getAppId()); http://git-wip-us.apache.org/repos/asf/stratos/blob/fe38bdcf/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java index 21d8267..9f69ee2 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java @@ -204,6 +204,14 @@ public class TopologyEventPublisher { publishEvent(clusterActivatedEvent); } + public static void sendClusterInActivateEvent(ClusterInActivateEvent clusterInActiveEvent) { + if(log.isInfoEnabled()) { + log.info(String.format("Publishing cluster in-active event: [service] %s [cluster] %s [appId] %s", + clusterInActiveEvent.getServiceName(), clusterInActiveEvent.getClusterId() , clusterInActiveEvent.getAppId())); + } + publishEvent(clusterInActiveEvent); + } + public static void sendMemberTerminatedEvent(String serviceName, String clusterId, String networkPartitionId, String partitionId, String memberId, Properties properties, String groupId) { @@ -265,6 +273,15 @@ public class TopologyEventPublisher { publishEvent(applicationTerminatedEvent); } + public static void sendGroupInActiveEvent(GroupInActivateEvent groupInActivateEvent) { + if(log.isInfoEnabled()) { + log.info(String.format("Publishing group in-active event: [appId] %s", + groupInActivateEvent.getAppId())); + } + publishEvent(groupInActivateEvent); + } + + public static void sendGroupTerminatedEvent(GroupTerminatedEvent groupTerminatedTopologyEvent) { if(log.isInfoEnabled()) { log.info(String.format("Publishing group terminated event: [appId] %s",
