Adding deactivate method to Auto Scaler server component
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/d0664e95 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/d0664e95 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/d0664e95 Branch: refs/heads/master Commit: d0664e95bf0dcfe2af2677f769d89b5d707c8826 Parents: d046aea Author: Udara Liyanage <[email protected]> Authored: Fri Dec 20 19:07:06 2013 -0500 Committer: Udara Liyanage <[email protected]> Committed: Fri Dec 20 19:07:06 2013 -0500 ---------------------------------------------------------------------- .../internal/AutoscalerServerComponent.java | 25 +++++++++++++------- .../health/HealthEventMessageDelegator.java | 8 ++++++- 2 files changed, 23 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d0664e95/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java index a68800b..fba11f9 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java @@ -18,6 +18,9 @@ */ package org.apache.stratos.autoscaler.internal; +import java.util.Iterator; +import java.util.List; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.NetworkPartitionContext; @@ -30,8 +33,6 @@ import org.apache.stratos.autoscaler.policy.PolicyManager; import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; import org.apache.stratos.autoscaler.registry.RegistryManager; import org.apache.stratos.autoscaler.topology.AutoscalerTopologyReceiver; -import org.apache.stratos.autoscaler.util.AutoScalerConstants; -import org.apache.stratos.autoscaler.util.Deserializer; import org.apache.stratos.autoscaler.util.ServiceReferenceHolder; import org.apache.stratos.cloud.controller.deployment.partition.Partition; import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber; @@ -40,10 +41,6 @@ import org.osgi.service.component.ComponentContext; import org.wso2.carbon.registry.api.RegistryException; import org.wso2.carbon.registry.core.service.RegistryService; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - /** * @scr.component name=org.apache.stratos.autoscaler.internal.AutoscalerServerComponent" * immediate="true" @@ -57,18 +54,22 @@ import java.util.List; public class AutoscalerServerComponent { private static final Log log = LogFactory.getLog(AutoscalerServerComponent.class); + AutoscalerTopologyReceiver asTopologyReceiver; + TopicSubscriber healthStatTopicSubscriber; + HealthEventMessageDelegator healthEventMessageDelegator; protected void activate(ComponentContext componentContext) throws Exception { try { // Start topology receiver - Thread topologyTopicSubscriberThread = new Thread(new AutoscalerTopologyReceiver()); + asTopologyReceiver = new AutoscalerTopologyReceiver(); + Thread topologyTopicSubscriberThread = new Thread(asTopologyReceiver); topologyTopicSubscriberThread.start(); if (log.isDebugEnabled()) { log.debug("Topology receiver thread started"); } // Start health stat receiver - TopicSubscriber healthStatTopicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC); + healthStatTopicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC); healthStatTopicSubscriber.setMessageListener(new HealthEventMessageReceiver()); Thread healthStatTopicSubscriberThread = new Thread(healthStatTopicSubscriber); healthStatTopicSubscriberThread.start(); @@ -76,7 +77,7 @@ public class AutoscalerServerComponent { log.debug("Health event message receiver thread started"); } - HealthEventMessageDelegator healthEventMessageDelegator = new HealthEventMessageDelegator(); + healthEventMessageDelegator = new HealthEventMessageDelegator(); Thread healthDelegatorThread = new Thread(healthEventMessageDelegator); healthDelegatorThread.start(); if (log.isDebugEnabled()) { @@ -121,6 +122,12 @@ public class AutoscalerServerComponent { } } + protected void deactivate(ComponentContext context) { + asTopologyReceiver.terminate(); + healthStatTopicSubscriber.terminate(); + healthEventMessageDelegator.terminate(); + } + protected void setRegistryService(RegistryService registryService) { if (log.isDebugEnabled()) { log.debug("Setting the Registry Service"); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d0664e95/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java index 67e6701..0fa4ccd 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java @@ -54,12 +54,13 @@ import com.google.gson.stream.JsonReader; public class HealthEventMessageDelegator implements Runnable { private static final Log log = LogFactory.getLog(HealthEventMessageDelegator.class); + private boolean terminate = false; @Override public void run() { log.info("Health event message delegator started"); - while (true) { + while (!terminate) { try { TextMessage message = HealthEventQueue.getInstance().take(); @@ -207,6 +208,7 @@ public class HealthEventMessageDelegator implements Runnable { log.error("Failed to retrieve the health stat event message.", e); } } + log.warn("Health event Message delegater is terminated"); } private LoadAverage findLoadAverage(Event event) { @@ -434,4 +436,8 @@ public class HealthEventMessageDelegator implements Runnable { this.properties = properties; } } + + public void terminate(){ + this.terminate = true; + } }
