Repository: stratos Updated Branches: refs/heads/4.0.0-grouping 01315a745 -> 1b7064dae
adding flag upon member fault and refactoring statusChecker Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/1b7064da Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/1b7064da Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/1b7064da Branch: refs/heads/4.0.0-grouping Commit: 1b7064daee10880ddfbf5c1a848c0fd0f108fdc0 Parents: 01315a7 Author: reka <[email protected]> Authored: Wed Oct 29 11:33:06 2014 +0530 Committer: reka <[email protected]> Committed: Wed Oct 29 11:33:06 2014 +0530 ---------------------------------------------------------------------- .../grouping/topic/StatusEventPublisher.java | 86 ++++----- .../AutoscalerHealthStatEventReceiver.java | 6 +- .../AutoscalerTopologyEventReceiver.java | 15 +- .../monitor/AbstractClusterMonitor.java | 13 +- .../monitor/application/ApplicationMonitor.java | 4 +- .../monitor/cluster/ClusterMonitor.java | 3 +- .../autoscaler/monitor/group/GroupMonitor.java | 4 +- .../status/checker/StatusChecker.java | 188 +++++++++---------- 8 files changed, 155 insertions(+), 164 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/1b7064da/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/StatusEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/StatusEventPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/StatusEventPublisher.java index 5b09a21..7bbe8ce 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/StatusEventPublisher.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/StatusEventPublisher.java @@ -19,18 +19,16 @@ public class StatusEventPublisher { private static final Log log = LogFactory.getLog(StatusEventPublisher.class); public static void sendClusterActivatedEvent(String appId, String serviceName, String clusterId) { - - if (log.isInfoEnabled()) { - log.info("Publishing Cluster activated event for [application]: " + appId + - " [cluster]: " + clusterId); - } - try { TopologyManager.acquireReadLockForCluster(serviceName, clusterId); Service service = TopologyManager.getTopology().getService(serviceName); if (service != null) { Cluster cluster = service.getCluster(clusterId); if (cluster.isStateTransitionValid(ClusterStatus.Active)) { + if (log.isInfoEnabled()) { + log.info("Publishing Cluster activated event for [application]: " + appId + + " [cluster]: " + clusterId); + } AppStatusClusterActivatedEvent clusterActivatedEvent = new AppStatusClusterActivatedEvent(appId, serviceName, clusterId); @@ -45,18 +43,16 @@ public class StatusEventPublisher { } public static void sendClusterInActivateEvent(String appId, String serviceName, String clusterId) { - - if (log.isInfoEnabled()) { - log.info("Publishing Cluster in-activate event for [application]: " + appId + - " [cluster]: " + clusterId); - } - try { TopologyManager.acquireReadLockForCluster(serviceName, clusterId); Service service = TopologyManager.getTopology().getService(serviceName); if (service != null) { Cluster cluster = service.getCluster(clusterId); if (cluster.isStateTransitionValid(ClusterStatus.Inactive)) { + if (log.isInfoEnabled()) { + log.info("Publishing Cluster in-activate event for [application]: " + appId + + " [cluster]: " + clusterId); + } AppStatusClusterInactivateEvent clusterInActivateEvent = new AppStatusClusterInactivateEvent(appId, serviceName, clusterId); @@ -73,16 +69,16 @@ public class StatusEventPublisher { public static void sendClusterTerminatingEvent(String appId, String serviceName, String clusterId) { - if (log.isInfoEnabled()) { - log.info("Publishing Cluster Terminating event for [application]: " + appId + - " [cluster]: " + clusterId); - } try { TopologyManager.acquireReadLockForCluster(serviceName, clusterId); Service service = TopologyManager.getTopology().getService(serviceName); if (service != null) { Cluster cluster = service.getCluster(clusterId); if (cluster.isStateTransitionValid(ClusterStatus.Terminating)) { + if (log.isInfoEnabled()) { + log.info("Publishing Cluster Terminating event for [application]: " + appId + + " [cluster]: " + clusterId); + } AppStatusClusterTerminatingEvent appStatusClusterTerminatingEvent = new AppStatusClusterTerminatingEvent(appId, serviceName, clusterId); @@ -99,17 +95,16 @@ public class StatusEventPublisher { } public static void sendClusterTerminatedEvent(String appId, String serviceName, String clusterId) { - - if (log.isInfoEnabled()) { - log.info("Publishing Cluster terminated event for [application]: " + appId + - " [cluster]: " + clusterId); - } try { TopologyManager.acquireReadLockForCluster(serviceName, clusterId); Service service = TopologyManager.getTopology().getService(serviceName); if (service != null) { Cluster cluster = service.getCluster(clusterId); if (cluster.isStateTransitionValid(ClusterStatus.Terminated)) { + if (log.isInfoEnabled()) { + log.info("Publishing Cluster terminated event for [application]: " + appId + + " [cluster]: " + clusterId); + } AppStatusClusterTerminatedEvent appStatusClusterTerminatedEvent = new AppStatusClusterTerminatedEvent(appId, serviceName, clusterId); @@ -125,18 +120,16 @@ public class StatusEventPublisher { } public static void sendGroupActivatedEvent(String appId, String groupId) { - - if (log.isInfoEnabled()) { - log.info("Publishing Group activated event for [application]: " + appId + - " [group]: " + groupId); - } - try { TopologyManager.acquireReadLockForApplication(appId); Application application = TopologyManager.getTopology().getApplication(appId); if (application != null) { Group group = application.getGroupRecursively(groupId); if (group.isStateTransitionValid(GroupStatus.Active)) { + if (log.isInfoEnabled()) { + log.info("Publishing Group activated event for [application]: " + appId + + " [group]: " + groupId); + } AppStatusGroupActivatedEvent groupActivatedEvent = new AppStatusGroupActivatedEvent(appId, groupId); @@ -151,17 +144,16 @@ public class StatusEventPublisher { } public static void sendGroupInActivateEvent(String appId, String groupId) { - - if (log.isInfoEnabled()) { - log.info("Publishing Group in-activate event for [application]: " + appId + - " [group]: " + groupId); - } try { TopologyManager.acquireReadLockForApplication(appId); Application application = TopologyManager.getTopology().getApplication(appId); if (application != null) { Group group = application.getGroupRecursively(groupId); if (group.isStateTransitionValid(GroupStatus.Inactive)) { + if (log.isInfoEnabled()) { + log.info("Publishing Group in-activate event for [application]: " + appId + + " [group]: " + groupId); + } AppStatusGroupInactivateEvent appStatusGroupInactivateEvent = new AppStatusGroupInactivateEvent(appId, groupId); @@ -176,17 +168,16 @@ public class StatusEventPublisher { } public static void sendGroupTerminatingEvent(String appId, String groupId) { - - if (log.isInfoEnabled()) { - log.info("Publishing Group terminating event for [application]: " + appId + - " [group]: " + groupId); - } try { TopologyManager.acquireReadLockForApplication(appId); Application application = TopologyManager.getTopology().getApplication(appId); if (application != null) { Group group = application.getGroupRecursively(groupId); if (group.isStateTransitionValid(GroupStatus.Terminating)) { + if (log.isInfoEnabled()) { + log.info("Publishing Group terminating event for [application]: " + appId + + " [group]: " + groupId); + } AppStatusGroupTerminatingEvent groupInTerminatingEvent = new AppStatusGroupTerminatingEvent(appId, groupId); publishEvent(groupInTerminatingEvent); @@ -227,16 +218,14 @@ public class StatusEventPublisher { } public static void sendApplicationActivatedEvent(String appId) { - - if (log.isInfoEnabled()) { - log.info("Publishing Application activated event for [application]: " + appId); - } - try { TopologyManager.acquireReadLockForApplication(appId); Application application = TopologyManager.getTopology().getApplication(appId); if (application != null) { if (application.isStateTransitionValid(ApplicationStatus.Active)) { + if (log.isInfoEnabled()) { + log.info("Publishing Application activated event for [application]: " + appId); + } AppStatusApplicationActivatedEvent applicationActivatedEvent = new AppStatusApplicationActivatedEvent(appId); @@ -273,15 +262,14 @@ public class StatusEventPublisher { } public static void sendApplicationTerminatingEvent(String appId) { - if (log.isInfoEnabled()) { - log.info("Publishing Application terminated event for [application]: " + appId); - } - try { TopologyManager.acquireReadLockForApplication(appId); Application application = TopologyManager.getTopology().getApplication(appId); if (application != null) { if (application.isStateTransitionValid(ApplicationStatus.Terminating)) { + if (log.isInfoEnabled()) { + log.info("Publishing Application terminated event for [application]: " + appId); + } AppStatusApplicationTerminatingEvent applicationTerminatingEvent = new AppStatusApplicationTerminatingEvent(appId); publishEvent(applicationTerminatingEvent); @@ -295,14 +283,14 @@ public class StatusEventPublisher { } public static void sendApplicationTerminatedEvent(String appId, Set<ClusterDataHolder> clusterData) { - if (log.isInfoEnabled()) { - log.info("Publishing Application terminated event for [application]: " + appId); - } try { TopologyManager.acquireReadLockForApplication(appId); Application application = TopologyManager.getTopology().getApplication(appId); if (application != null) { if (application.isStateTransitionValid(ApplicationStatus.Terminated)) { + if (log.isInfoEnabled()) { + log.info("Publishing Application terminated event for [application]: " + appId); + } AppStatusApplicationTerminatedEvent applicationTerminatedEvent = new AppStatusApplicationTerminatedEvent(appId, clusterData); publishEvent(applicationTerminatedEvent); http://git-wip-us.apache.org/repos/asf/stratos/blob/1b7064da/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java index 6986fbb..cf072f3 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java @@ -656,15 +656,15 @@ public class AutoscalerHealthStatEventReceiver implements Runnable { } return; } + //Check the clusterStatus as part of the member fault event + StatusChecker.getInstance().onMemberFaultEvent(clusterId, partitionCtxt); + // terminate the faulty member CloudControllerClient ccClient = CloudControllerClient.getInstance(); ccClient.terminate(memberId); // remove from active member list partitionCtxt.removeActiveMemberById(memberId); - //Check the clusterStatus as part of the member fault event - StatusChecker.getInstance().onMemberFaultEvent(clusterId, partitionCtxt); - if (log.isInfoEnabled()) { log.info(String.format("Faulty member is terminated and removed from the active members list: [member] %s [partition] %s [cluster] %s ", http://git-wip-us.apache.org/repos/asf/stratos/blob/1b7064da/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java index 1423385..7b411b8 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -155,7 +155,9 @@ public class AutoscalerTopologyEventReceiver implements Runnable { (AbstractClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId); //changing the status in the monitor, will notify its parent monitor - clusterMonitor.setStatus(ClusterStatus.Active); + if(clusterMonitor!= null) { + clusterMonitor.setStatus(ClusterStatus.Active); + } } }); @@ -190,7 +192,9 @@ public class AutoscalerTopologyEventReceiver implements Runnable { (AbstractClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId); //changing the status in the monitor, will notify its parent monitor - clusterMonitor.setStatus(ClusterStatus.Inactive); + if(clusterMonitor!= null) { + clusterMonitor.setStatus(ClusterStatus.Inactive); + } } }); @@ -237,9 +241,6 @@ public class AutoscalerTopologyEventReceiver implements Runnable { if (clusterMonitor != null) { clusterMonitor.setStatus(ClusterStatus.Terminated); } - - //starting the status checker to decide on the status of it's parent - //StatusChecker.getInstance().onClusterStatusChange(clusterId, appId); } }); @@ -337,7 +338,9 @@ public class AutoscalerTopologyEventReceiver implements Runnable { String appId = applicationActivatedEvent.getAppId(); ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(appId); - appMonitor.setStatus(ApplicationStatus.Active); + if(appMonitor != null) { + appMonitor.setStatus(ApplicationStatus.Active); + } } }); http://git-wip-us.apache.org/repos/asf/stratos/blob/1b7064da/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java index 0a9d628..1461b6e 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java @@ -65,6 +65,8 @@ abstract public class AbstractClusterMonitor extends Monitor implements Runnable protected String serviceId; protected String appId; + protected boolean hasFaultyMember = false; + protected ClusterStatus status; //protected ParentComponentMonitor parent; @@ -228,7 +230,7 @@ abstract public class AbstractClusterMonitor extends Monitor implements Runnable public void setStatus(ClusterStatus status) { log.info(String.format("[Monitor] %s is notifying the parent" + "on its state change from %s to %s", clusterId, this.status, status)); - if(this.status != status) { + //if(this.status != status) { this.status = status; /** * notifying the parent monitor about the state change @@ -241,7 +243,7 @@ abstract public class AbstractClusterMonitor extends Monitor implements Runnable } else { MonitorStatusEventBuilder.handleClusterStatusEvent(this.parent, this.status, this.clusterId); } - } + //} } @@ -260,4 +262,11 @@ abstract public class AbstractClusterMonitor extends Monitor implements Runnable } + public boolean isHasFaultyMember() { + return hasFaultyMember; + } + + public void setHasFaultyMember(boolean hasFaultyMember) { + this.hasFaultyMember = hasFaultyMember; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/1b7064da/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java index 966c8b2..a147b35 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java @@ -142,9 +142,9 @@ public class ApplicationMonitor extends ParentComponentMonitor { public void setStatus(ApplicationStatus status) { log.info(String.format("[ApplicationMonitor] %s " + "state changes from %s to %s", id, this.status, status)); - if(this.status != status) { + //if(this.status != status) { this.status = status; - } + //} } @Override http://git-wip-us.apache.org/repos/asf/stratos/blob/1b7064da/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java index a089fc6..d67b4f4 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java @@ -74,7 +74,8 @@ public class ClusterMonitor extends AbstractClusterMonitor { while (!isDestroyed()) { try { if ((this.status.getCode() <= ClusterStatus.Active.getCode()) || - (this.status == ClusterStatus.Inactive && !hasDependent)) { + (this.status == ClusterStatus.Inactive && !hasDependent) || + this.hasFaultyMember) { if (log.isDebugEnabled()) { log.debug("Cluster monitor is running.. " + this.toString()); } http://git-wip-us.apache.org/repos/asf/stratos/blob/1b7064da/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/group/GroupMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/group/GroupMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/group/GroupMonitor.java index ab3ef16..261f745 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/group/GroupMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/group/GroupMonitor.java @@ -140,7 +140,7 @@ public class GroupMonitor extends ParentComponentMonitor implements EventHandler public void setStatus(GroupStatus status) { log.info(String.format("[Monitor] %s is notifying the parent" + "on its state change from %s to %s", id, this.status, status)); - if(this.status != status) { + //if(this.status != status) { this.status = status; //notifying the parent if (status == GroupStatus.Inactive && !this.hasDependent) { @@ -149,7 +149,7 @@ public class GroupMonitor extends ParentComponentMonitor implements EventHandler } else { MonitorStatusEventBuilder.handleGroupStatusEvent(this.parent, this.status, this.id); } - } + //} } } http://git-wip-us.apache.org/repos/asf/stratos/blob/1b7064da/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java index acf6d71..d2d8b44 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java @@ -54,40 +54,48 @@ public class StatusChecker { * * @param clusterId id of the cluster */ - public void onMemberStatusChange(String clusterId) { - ClusterMonitor monitor = (ClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId); - boolean clusterActive = false; - if(monitor != null) { - clusterActive = clusterActive(monitor); + public void onMemberStatusChange(final String clusterId) { + Runnable group = new Runnable() { + public void run() { + ClusterMonitor monitor = (ClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId); + boolean clusterActive = false; + if (monitor != null) { + clusterActive = clusterActive(monitor); - } - log.info("Status checker running for [cluster] " + clusterId + - " the status [clusterActive] " + clusterActive); - // if active then notify upper layer - if (clusterActive) { - //send event to cluster status topic - StatusEventPublisher.sendClusterActivatedEvent(monitor.getAppId(), - monitor.getServiceId(), monitor.getClusterId()); - } + } + log.info("Status checker running for [cluster] " + clusterId + + " the status [clusterActive] " + clusterActive); + // if active then notify upper layer + if (clusterActive) { + //send event to cluster status topic + StatusEventPublisher.sendClusterActivatedEvent(monitor.getAppId(), + monitor.getServiceId(), monitor.getClusterId()); + } + } + }; + Thread groupThread = new Thread(group); + groupThread.start(); } - public void onMemberTermination(String clusterId) { - ClusterMonitor monitor = (ClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId); - boolean clusterMonitorHasMembers = clusterMonitorHasMembers(monitor); - boolean clusterActive = clusterActive(monitor); - - try { - TopologyManager.acquireReadLockForCluster(monitor.getServiceId(), monitor.getClusterId()); - Service service = TopologyManager.getTopology().getService(monitor.getServiceId()); - Cluster cluster; - if(service != null) { - cluster = service.getCluster(monitor.getClusterId()); - if(cluster != null) { - if(!clusterMonitorHasMembers && cluster.getStatus() == ClusterStatus.Terminating) { - StatusEventPublisher.sendClusterTerminatedEvent(monitor.getAppId(), monitor.getServiceId(), - monitor.getClusterId()); - } else { - log.info("Cluster has non terminated [members] and in the [status] " + public void onMemberTermination(final String clusterId) { + Runnable group = new Runnable() { + public void run() { + ClusterMonitor monitor = (ClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId); + boolean clusterMonitorHasMembers = clusterMonitorHasMembers(monitor); + boolean clusterActive = clusterActive(monitor); + + try { + TopologyManager.acquireReadLockForCluster(monitor.getServiceId(), monitor.getClusterId()); + Service service = TopologyManager.getTopology().getService(monitor.getServiceId()); + Cluster cluster; + if (service != null) { + cluster = service.getCluster(monitor.getClusterId()); + if (cluster != null) { + if (!clusterMonitorHasMembers && cluster.getStatus() == ClusterStatus.Terminating) { + StatusEventPublisher.sendClusterTerminatedEvent(monitor.getAppId(), monitor.getServiceId(), + monitor.getClusterId()); + } else { + log.info("Cluster has non terminated [members] and in the [status] " + cluster.getStatus().toString()); /*if(!clusterActive && !(cluster.getStatus() == ClusterStatus.Inactive || @@ -97,15 +105,19 @@ public class StatusChecker { monitor.getServiceId(), clusterId); }*/ + } + } } - } - } - } finally { - TopologyManager.releaseReadLockForCluster(monitor.getServiceId(), monitor.getClusterId()); + } finally { + TopologyManager.releaseReadLockForCluster(monitor.getServiceId(), monitor.getClusterId()); - } + } + } + }; + Thread groupThread = new Thread(group); + groupThread.start(); } @@ -147,21 +159,30 @@ public class StatusChecker { * @param partitionContext is to decide in which partition has less members while others have active members */ public void onMemberFaultEvent(final String clusterId, final PartitionContext partitionContext) { - ClusterMonitor monitor = (ClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId); - boolean clusterInActive = getClusterInActive(monitor, partitionContext); - String appId = monitor.getAppId(); - if (clusterInActive) { - //TODO evaluate life cycle - //send cluster In-Active event to cluster status topic - StatusEventPublisher.sendClusterInActivateEvent(appId, monitor.getServiceId(), clusterId); + Runnable group = new Runnable() { + public void run() { + ClusterMonitor monitor = (ClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId); + boolean clusterInActive = getClusterInActive(monitor, partitionContext); + String appId = monitor.getAppId(); + if (clusterInActive) { + //if the monitor is dependent, temporarily pausing it + if(monitor.isDependent()) { + monitor.setHasFaultyMember(true); + } + //send cluster In-Active event to cluster status topic + StatusEventPublisher.sendClusterInActivateEvent(appId, monitor.getServiceId(), clusterId); + + } else { + boolean clusterActive = clusterActive(monitor); + if (clusterActive) { + StatusEventPublisher.sendClusterActivatedEvent(appId, monitor.getServiceId(), clusterId); + } + } - } else { - boolean clusterActive = clusterActive(monitor); - if (clusterActive) { - //TODO evaluate life cycle - //send clusterActive event to cluster status topic } - } + }; + Thread groupThread = new Thread(group); + groupThread.start(); } private boolean getClusterInActive(AbstractClusterMonitor monitor, PartitionContext partitionContext) { @@ -195,7 +216,7 @@ public class StatusChecker { try { TopologyManager.acquireReadLockForApplication(appId); ParentComponent component; - if(groupId.equals(appId)) { + if (groupId.equals(appId)) { //it is an application component = TopologyManager.getTopology(). getApplication(appId); @@ -244,29 +265,6 @@ public class StatusChecker { log.info("cluster found: " + clusterFound); if (clusterFound || groups.containsKey(id)) { childFound = true; - /*if (!clusterData.isEmpty() && !groups.isEmpty()) { - if (log.isDebugEnabled()) { - log.debug("group active found: " + clusterFound); - } - - if (log.isDebugEnabled()) { - log.debug("Active cluster" + clustersActive + " and group: " + groupActive); - } - groupActive = clustersActive == ClusterStatus.Active && groupsActive == GroupStatus.Active; - } else if (!groups.isEmpty()) { - groupsActive = getGroupStatus(groups); - if (log.isDebugEnabled()) { - log.info("group active found: " + clusterFound); - } - groupActive = groupsActive == GroupStatus.Active; - } else if (!clusterData.isEmpty()) { - clustersActive = getClusterStatus(clusterData); - if (log.isDebugEnabled()) { - log.debug("Active cluster" + clustersActive + " and group: " + groupActive); - } - groupActive = clustersActive == ClusterStatus.Active; - } */ - clusterStatus = getClusterStatus(clusterData); groupStatus = getGroupStatus(groups); @@ -286,10 +284,10 @@ public class StatusChecker { } else if (groups.isEmpty() && clusterStatus == ClusterStatus.Inactive || clusterData.isEmpty() && groupStatus == GroupStatus.Inactive || groupStatus == GroupStatus.Inactive && clusterStatus == ClusterStatus.Inactive) { - //send the in activation event - if (parent instanceof Application) { - //send application activated event - log.warn("Application can't be in in-active : " + appId); + //send the in activation event + if (parent instanceof Application) { + //send application activated event + log.warn("Application can't be in in-active : " + appId); //StatusEventPublisher.sendApplicationInactivatedEvent(appId); } else if (parent instanceof Group) { //send activation to the parent @@ -299,13 +297,13 @@ public class StatusChecker { } else if (groups.isEmpty() && clusterStatus == ClusterStatus.Terminated || clusterData.isEmpty() && groupStatus == GroupStatus.Terminated || groupStatus == GroupStatus.Terminated && clusterStatus == ClusterStatus.Terminated) { - //send the terminated event + //send the terminated event if (parent instanceof Application) { //validating the life cycle try { TopologyManager.acquireReadLockForApplication(appId); Application application = TopologyManager.getTopology().getApplication(appId); - if(application.getStatus().equals(ApplicationStatus.Terminating)) { + if (application.getStatus().equals(ApplicationStatus.Terminating)) { log.info("sending app terminated: " + appId); StatusEventPublisher.sendApplicationTerminatedEvent(appId, parent.getClusterDataRecursively()); } else { @@ -351,33 +349,25 @@ public class StatusChecker { } private GroupStatus getGroupStatus(Map<String, Group> groups) { - boolean groupActiveStatus = false; GroupStatus status = null; boolean groupActive = false; boolean groupTerminated = false; for (Group group : groups.values()) { - /*if (group.getTempStatus() == Status.Activated) { - groupActiveStatus = true; - } else { - groupActiveStatus = false; - break; - }*/ - if (group.getStatus() == GroupStatus.Active) { groupActive = true; groupTerminated = false; - } else if(group.getStatus() == GroupStatus.Inactive){ + } else if (group.getStatus() == GroupStatus.Inactive) { status = GroupStatus.Inactive; break; - } else if(group.getStatus() == GroupStatus.Terminated) { + } else if (group.getStatus() == GroupStatus.Terminated) { groupActive = false; groupTerminated = true; - } else if(group.getStatus() == GroupStatus.Created) { + } else if (group.getStatus() == GroupStatus.Created) { groupActive = false; groupTerminated = false; status = GroupStatus.Created; - } else if(group.getStatus() == GroupStatus.Terminating) { + } else if (group.getStatus() == GroupStatus.Terminating) { groupActive = false; groupTerminated = false; status = GroupStatus.Terminating; @@ -385,9 +375,9 @@ public class StatusChecker { } } - if(groupActive) { + if (groupActive) { status = GroupStatus.Active; - } else if(groupTerminated) { + } else if (groupTerminated) { status = GroupStatus.Terminated; } return status; @@ -404,28 +394,28 @@ public class StatusChecker { if (cluster.getStatus() == ClusterStatus.Active) { clusterActive = true; clusterTerminated = false; - } else if(cluster.getStatus() == ClusterStatus.Inactive){ + } else if (cluster.getStatus() == ClusterStatus.Inactive) { status = ClusterStatus.Inactive; clusterActive = false; clusterTerminated = false; break; - } else if(cluster.getStatus() == ClusterStatus.Terminated) { + } else if (cluster.getStatus() == ClusterStatus.Terminated) { clusterActive = false; clusterTerminated = true; - } else if(cluster.getStatus() == ClusterStatus.Terminating) { + } else if (cluster.getStatus() == ClusterStatus.Terminating) { status = ClusterStatus.Terminating; clusterActive = false; clusterTerminated = false; - } else if(cluster.getStatus() == ClusterStatus.Created) { + } else if (cluster.getStatus() == ClusterStatus.Created) { status = ClusterStatus.Created; clusterActive = false; clusterTerminated = false; } } - if(clusterActive) { + if (clusterActive) { status = ClusterStatus.Active; - } else if(clusterTerminated) { + } else if (clusterTerminated) { status = ClusterStatus.Terminated; } return status;
