Repository: stratos Updated Branches: refs/heads/4.0.0-grouping fcd8a7ca4 -> 062cd7b40
fixing issue when terminating the cluster Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/062cd7b4 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/062cd7b4 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/062cd7b4 Branch: refs/heads/4.0.0-grouping Commit: 062cd7b404e2b4c506de72399b487a85855786f4 Parents: fcd8a7c Author: reka <[email protected]> Authored: Wed Oct 29 15:32:45 2014 +0530 Committer: reka <[email protected]> Committed: Wed Oct 29 15:34:22 2014 +0530 ---------------------------------------------------------------------- .../topic/InstanceNotificationPublisher.java | 11 + .../AutoscalerTopologyEventReceiver.java | 16 +- .../monitor/ParentComponentMonitor.java | 312 ++----------------- 3 files changed, 45 insertions(+), 294 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/062cd7b4/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/InstanceNotificationPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/InstanceNotificationPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/InstanceNotificationPublisher.java index d745d79..80fa295 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/InstanceNotificationPublisher.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/InstanceNotificationPublisher.java @@ -25,6 +25,7 @@ import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.event.Event; import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupClusterEvent; +import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent; import org.apache.stratos.messaging.util.Constants; public class InstanceNotificationPublisher { @@ -39,4 +40,14 @@ public class InstanceNotificationPublisher { log.info(String.format("Publishing Instance Cleanup Event: [cluster] %s", clusterId)); publish(new InstanceCleanupClusterEvent(clusterId)); } + + /** + * Publishing the instance termination notification to the instances + * + * @param memberId + */ + public void sendInstanceCleanupEventForMember(String memberId) { + log.info(String.format("Publishing Instance Cleanup Event: [member] %s", memberId)); + publish(new InstanceCleanupMemberEvent(memberId)); + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/062cd7b4/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java index 8c2d1a1..d66795c 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -206,22 +206,24 @@ public class AutoscalerTopologyEventReceiver implements Runnable { log.info("[ClusterTerminatingEvent] Received: " + event.getClass()); ClusterTerminatingEvent clusterTerminatingEvent = (ClusterTerminatingEvent) event; - String appId = clusterTerminatingEvent.getAppId(); String clusterId = clusterTerminatingEvent.getClusterId(); AbstractClusterMonitor clusterMonitor = (AbstractClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId); //changing the status in the monitor, will notify its parent monitor if (clusterMonitor != null) { - clusterMonitor.setDestroyed(true); - clusterMonitor.terminateAllMembers(); - clusterMonitor.setStatus(ClusterStatus.Terminating); + if (clusterMonitor.getStatus() == ClusterStatus.Active) { + // terminated gracefully + clusterMonitor.setStatus(ClusterStatus.Terminating); + InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId); + } else { + clusterMonitor.setStatus(ClusterStatus.Terminating); + clusterMonitor.terminateAllMembers(); + } + } else { log.warn("No Cluster Monitor found for cluster id " + clusterId); } - - //starting the status checker to decide on the status of it's parent - //StatusChecker.getInstance().onClusterStatusChange(clusterId, appId); } }); http://git-wip-us.apache.org/repos/asf/stratos/blob/062cd7b4/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java index 7ad0a43..f6857f7 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java @@ -44,12 +44,8 @@ import java.util.List; public abstract class ParentComponentMonitor extends Monitor { private static final Log log = LogFactory.getLog(ParentComponentMonitor.class); - //id of the monitor, it can be alias or the id - //protected String id; - //The monitors dependency tree with all the startable/killable dependencies + //The monitors dependency tree with all the start-able/kill-able dependencies protected DependencyTree dependencyTree; - //Application id of this particular monitor - //protected String appId; public ParentComponentMonitor(ParentComponent component) throws DependencyBuilderException { aliasToActiveMonitorsMap = new HashMap<String, Monitor>(); @@ -68,6 +64,11 @@ public abstract class ParentComponentMonitor extends Monitor { protected abstract void monitor(MonitorStatusEvent statusEvent); + /** + * This will start the child monitors based on the active of siblings according to start up order + * + * @param idOfEvent parent id of the event which received + */ protected void onChildActivatedEvent(String idOfEvent) { try { //if the activated monitor is in in_active map move it to active map @@ -89,7 +90,7 @@ public abstract class ParentComponentMonitor extends Monitor { } - protected void onChildTerminatingEvent(String idOfEvent) { + /*protected void onChildTerminatingEvent(String idOfEvent) { //Check whether hasDependent true if (!this.aliasToInActiveMonitorsMap.containsKey(idOfEvent)) { this.aliasToInActiveMonitorsMap.put(idOfEvent, this.aliasToActiveMonitorsMap.remove(idOfEvent)); @@ -111,7 +112,7 @@ public abstract class ParentComponentMonitor extends Monitor { } else { log.warn("Inactive Monitor not found for the id " + idOfEvent); } - } + }*/ @Override public void onParentEvent(MonitorStatusEvent statusEvent) { @@ -146,6 +147,10 @@ public abstract class ParentComponentMonitor extends Monitor { } } + /** + * + * @param idOfEvent + */ protected void onChildInActiveEvent(String idOfEvent) { List<ApplicationContext> terminationList; Monitor monitor; @@ -179,10 +184,12 @@ public abstract class ParentComponentMonitor extends Monitor { if (monitor != null) { if (monitor.hasActiveMonitors()) { //it is a group - StatusEventPublisher.sendGroupTerminatingEvent(this.appId, terminationContext.getId()); + StatusEventPublisher.sendGroupTerminatingEvent(this.appId, + terminationContext.getId()); } else { StatusEventPublisher.sendClusterTerminatingEvent(this.appId, - ((AbstractClusterMonitor) monitor).getServiceId(), terminationContext.getId()); + ((AbstractClusterMonitor) monitor).getServiceId(), + terminationContext.getId()); } } else { log.warn("The relevant [monitor] " + terminationContext.getId() + @@ -191,8 +198,8 @@ public abstract class ParentComponentMonitor extends Monitor { } } else { - log.warn("Wrong inActive event received from [Child] " + idOfEvent + " to the [parent]" - + " where child is identified as a independent"); + log.warn("Wrong inActive event received from [Child] " + idOfEvent + + " to the [parent]" + " where child is identified as a independent"); } } @@ -216,13 +223,14 @@ public abstract class ParentComponentMonitor extends Monitor { */ if (terminationList != null) { for (ApplicationContext context1 : terminationList) { - if (!this.aliasToInActiveMonitorsMap.containsKey(context1.getId())) { - allDependentTerminated = false; - } else if (this.aliasToActiveMonitorsMap.containsKey(context1.getId())) { + if (this.aliasToActiveMonitorsMap.containsKey(context1.getId())) { log.warn("Dependent [monitor] " + context1.getId() + " not in the correct state"); allDependentTerminated = false; - } else { + } else if (this.aliasToInActiveMonitorsMap.containsKey(context1.getId())) { log.info("Waiting for the [dependent] " + context1.getId() + " to be terminated..."); + allDependentTerminated = false; + } else { + allDependentTerminated = true; } } } @@ -251,11 +259,11 @@ public abstract class ParentComponentMonitor extends Monitor { if ((terminationList != null && allDependentTerminated || terminationList == null) && (parentContexts != null && canStart || parentContexts == null)) { //Find the non existent monitor by traversing dependency tree - try { this.startDependencyOnTermination(); } catch (TopologyInConsistentException e) { - e.printStackTrace(); + //TODO revert the siblings and notify parent, change a flag for reverting/un-subscription + log.error("Error while starting the monitor upon termination" + e); } } else { StatusChecker.getInstance().onChildStatusChange(idOfEvent, this.id, this.appId); @@ -428,274 +436,4 @@ public abstract class ParentComponentMonitor extends Monitor { } } } - - - /*protected synchronized void startGroupMonitor(ParentComponentMonitor parent, GroupContext groupContext) { - Thread th = null; - //String groupId = group.getUniqueIdentifier(); - if (!this.aliasToActiveMonitorsMap.containsKey(groupId)) { - if (log.isDebugEnabled()) { - log.debug(String - .format("Group monitor Adder has been added: [group] %s ", - groupId)); - } - th = new Thread( - new GroupMonitorAdder(parent, groupId, this.appId)); - } - - if (th != null) { - th.start(); - *//*try { - th.join(); - } catch (InterruptedException ignore) { - }*//* - - log.info(String - .format("Group monitor thread has been started successfully: [group] %s ", - groupId)); - } - } -*/ - - /*private Group getGroupFromTopology(String groupId) throws TopologyInConsistentException { - Application application = TopologyManager.getTopology().getApplication(this.appId); - if(application != null) { - Group group = application.getGroupRecursively(groupId); - if(group != null) { - return group; - } else { - String msg = "[Group] " + groupId + " cannot be found in the Topology"; - throw new TopologyInConsistentException(msg); - } - } else { - String msg = "[Application] " + this.appId + " cannot be found in the Topology"; - throw new TopologyInConsistentException(msg); - } - }*/ - - /*protected synchronized void startClusterMonitor(ParentComponentMonitor parent, ApplicationContext clusterContext) { - Thread th = null; - if (!this.aliasToActiveMonitorsMap.containsKey(clusterContext.getId())) { - th = new Thread( - new ClusterMonitorAdder(parent, clusterContext)); - if (log.isDebugEnabled()) { - log.debug(String - .format("Cluster monitor Adder has been added: [cluster] %s ", - clusterContext.getClusterId())); - } - } - if (th != null) { - th.start(); - log.info(String - .format("Cluster monitor thread has been started successfully: [cluster] %s ", - clusterContext.getClusterId())); - } - }*/ - - - /*public Map<String, AbstractClusterMonitor> getClusterIdToClusterMonitorsMap() { - return clusterIdToClusterMonitorsMap; - } - - public void setClusterIdToClusterMonitorsMap(Map<String, AbstractClusterMonitor> clusterIdToClusterMonitorsMap) { - this.clusterIdToClusterMonitorsMap = clusterIdToClusterMonitorsMap; - } - - public void addAbstractMonitor(AbstractClusterMonitor monitor) { - this.clusterIdToClusterMonitorsMap.put(monitor.getClusterId(), monitor); - } - - public AbstractClusterMonitor getAbstractMonitor(String clusterId) { - return this.clusterIdToClusterMonitorsMap.get(clusterId); - } -*/ - - - /*private class ClusterMonitorAdder implements Runnable { - private Cluster cluster; - private ParentComponentMonitor parent; - - public ClusterMonitorAdder(ParentComponentMonitor parent, Cluster cluster) { - this.parent = parent; - this.cluster = cluster; - } - - public void run() { - ClusterMonitor monitor = null; - int retries = 5; - boolean success = false; - do { - try { - Thread.sleep(5000); - } catch (InterruptedException e1) { - } - try { - if (log.isDebugEnabled()) { - log.debug("CLuster monitor is going to be started for [cluster] " - + cluster.getClusterId()); - } - monitor = AutoscalerUtil.getClusterMonitor(cluster); - monitor.setParent(parent); - //setting the status of cluster monitor w.r.t Topology cluster - //if(cluster.getStatus() != Status.Created && - if(cluster.getStatus() != monitor.getStatus()) { - //updating the status, so that it will notify the parent - monitor.setStatus(cluster.getStatus()); - } - //monitor.addObserver(parent); - success = true; - //TODO start the status checker - } catch (PolicyValidationException e) { - String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId(); - log.warn(msg, e); - retries--; - - - } catch (PartitionValidationException e) { - String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId(); - log.warn(msg, e); - retries--; - - } - - } while (!success && retries != 0); - - - if (monitor == null) { - String msg = "Cluster monitor creation failed, even after retrying for 5 times, " - + "for cluster: " + cluster.getClusterId(); - log.error(msg); - //TODO parent.notify(); - throw new RuntimeException(msg); - } - - Thread th = new Thread(monitor); - th.start(); - - AutoscalerContext.getInstance().addMonitor(monitor); - aliasToActiveMonitorsMap.put(cluster.getClusterId(), monitor); - if (log.isInfoEnabled()) { - log.info(String.format("Cluster monitor has been added successfully: [cluster] %s", - cluster.getClusterId())); - } - } - } -*/ - - - /*private class GroupMonitorAdder implements Runnable { - private ParentComponentMonitor parent; - private String groupId; - private String appId; - - public GroupMonitorAdder(ParentComponentMonitor parent, String groupId, String appId) { - this.parent = parent; - this.groupId = groupId; - this.appId = appId; - } - - public void run() { - GroupMonitor monitor = null; - int retries = 5; - boolean success = false; - do { - try { - Thread.sleep(5000); - } catch (InterruptedException e1) { - } - - try { - if (log.isDebugEnabled()) { - log.debug("Group monitor is going to be started for [group] " - + groupId ); - } - monitor = AutoscalerUtil.getGroupMonitor(groupId, appId); - //setting the parent monitor - monitor.setParent(parent); - //setting the status of cluster monitor w.r.t Topology cluster - //if(group.getStatus() != Status.Created && - - //monitor.addObserver(parent); - success = true; - } catch (DependencyBuilderException e) { - String msg = "Group monitor creation failed for group: " + groupId; - log.warn(msg, e); - retries--; - } catch (TopologyInConsistentException e) { - String msg = "Group monitor creation failed for group: " + groupId; - log.warn(msg, e); - retries--; - } - } while (!success && retries != 0); - - if (monitor == null) { - String msg = "Group monitor creation failed, even after retrying for 5 times, " - + "for group: " + groupId; - log.error(msg); - //TODO parent.notify(); as it got to failed - - throw new RuntimeException(msg); - } - - aliasToActiveMonitorsMap.put(groupId, monitor); - //parent.addObserver(monitor); - - if (log.isInfoEnabled()) { - log.info(String.format("Group monitor has been added successfully: [group] %s", - groupId)); - } - } - } - - private class LBClusterMonitorAdder implements Runnable { - private Cluster cluster; - - public LBClusterMonitorAdder(Cluster cluster) { - this.cluster = cluster; - } - - public void run() { - LbClusterMonitor monitor = null; - int retries = 5; - boolean success = false; - do { - try { - Thread.sleep(5000); - } catch (InterruptedException e1) { - } - try { - monitor = AutoscalerUtil.getLBClusterMonitor(cluster); - success = true; - - } catch (PolicyValidationException e) { - String msg = "LB Cluster monitor creation failed for cluster: " + cluster.getClusterId(); - log.warn(msg, e); - retries--; - - } catch (PartitionValidationException e) { - String msg = "LB Cluster monitor creation failed for cluster: " + cluster.getClusterId(); - log.warn(msg, e); - retries--; - } - } while (!success && retries <= 0); - - if (monitor == null) { - String msg = "LB Cluster monitor creation failed, even after retrying for 5 times, " - + "for cluster: " + cluster.getClusterId(); - log.error(msg); - throw new RuntimeException(msg); - } - - Thread th = new Thread(monitor); - th.start(); - AutoscalerContext.getInstance().addLbMonitor(monitor); - aliasToActiveMonitorsMap.put(cluster.getClusterId(), monitor); - if (log.isInfoEnabled()) { - log.info(String.format("LB Cluster monitor has been added successfully: [cluster] %s", - cluster.getClusterId())); - } - } - }*/ - - }
