Updated Branches:
  refs/heads/master 41687d86d -> 253bbb08a

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
deleted file mode 100644
index d1bcb1c..0000000
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
+++ /dev/null
@@ -1,639 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.message.receiver.health;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.TextMessage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.*;
-import 
org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.exception.SpawningException;
-import org.apache.stratos.autoscaler.exception.TerminationException;
-import org.apache.stratos.autoscaler.monitor.AbstractMonitor;
-import org.apache.stratos.autoscaler.policy.model.LoadAverage;
-import org.apache.stratos.autoscaler.policy.model.MemoryConsumption;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.cloud.controller.deployment.partition.Partition;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-import com.google.gson.stream.JsonReader;
-
-
-/**
- * A thread for processing topology messages and updating the topology data 
structure.
- */
-public class HealthEventMessageDelegator implements Runnable {
-
-    private static final Log log = 
LogFactory.getLog(HealthEventMessageDelegator.class);
-    private boolean terminate = false;
-
-    @Override
-    public void run() {
-        if(log.isInfoEnabled()) {
-            log.info("Health event message delegator started");
-        }
-
-        if(log.isDebugEnabled()) {
-            log.debug("Waiting for topology to be initialized");
-        }
-        while(!TopologyManager.getTopology().isInitialized());
-
-        while (!terminate) {
-            try {
-                TextMessage message = HealthEventQueue.getInstance().take();
-
-                String messageText = message.getText();
-                if (log.isDebugEnabled()) {
-                    log.debug("Health event message received: [message] " + 
messageText);
-                }
-                Event event = jsonToEvent(messageText);
-                String eventName = event.getEventName();
-
-                if (log.isInfoEnabled()) {
-                    log.info(String.format("Received event: [event-name] %s", 
eventName));
-                }
-
-                if (Constants.AVERAGE_REQUESTS_IN_FLIGHT.equals(eventName)) {
-                    String clusterId = event.getProperties().get("cluster_id");
-                    String networkPartitionId = 
event.getProperties().get("network_partition_id");
-                    String value = event.getProperties().get("value");
-                    Float floatValue = Float.parseFloat(value);
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("%s event: [cluster] %s 
[network-partition] %s [value] %s", eventName,
-                                clusterId, networkPartitionId, value));
-                    }
-
-                    AbstractMonitor monitor = 
AutoscalerContext.getInstance().getMonitor(clusterId);
-                    if(null != monitor){
-                        NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-                        if(null != networkPartitionContext){
-                            
networkPartitionContext.setAverageRequestsInFlight(floatValue);
-                        } else {
-                            if(log.isErrorEnabled()) {
-                               log.error(String.format("Network partition 
context is not available for :" +
-                                       " [network partition] %s", 
networkPartitionId));
-                            }
-                        }
-                    } else {
-
-                        if(log.isErrorEnabled()) {
-                           log.error(String.format("Cluster monitor is not 
available for : [cluster] %s", clusterId));
-                        }
-                    }
-
-                } else if 
(Constants.GRADIENT_OF_REQUESTS_IN_FLIGHT.equals(eventName)) {
-                    String clusterId = event.getProperties().get("cluster_id");
-                    String networkPartitionId = 
event.getProperties().get("network_partition_id");
-                    String value = event.getProperties().get("value");
-                    Float floatValue = Float.parseFloat(value);
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("%s event: [cluster] %s 
[network-partition] %s [value] %s", eventName,
-                                clusterId, networkPartitionId, value));
-                    }
-                    AbstractMonitor monitor = 
AutoscalerContext.getInstance().getMonitor(clusterId);
-                    if(null != monitor){
-                        NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-                        if(null != networkPartitionContext){
-                            
networkPartitionContext.setRequestsInFlightGradient(floatValue);
-                        } else {
-                            if(log.isErrorEnabled()) {
-                               log.error(String.format("Network partition 
context is not available for :" +
-                                       " [network partition] %s", 
networkPartitionId));
-                            }
-                        }
-                    } else {
-
-                        if(log.isErrorEnabled()) {
-                           log.error(String.format("Cluster monitor is not 
available for : [cluster] %s", clusterId));
-                        }
-                    }
-
-                } else if 
(Constants.SECOND_DERIVATIVE_OF_REQUESTS_IN_FLIGHT.equals(eventName)) {
-                    String clusterId = event.getProperties().get("cluster_id");
-                    String networkPartitionId = 
event.getProperties().get("network_partition_id");
-                    String value = event.getProperties().get("value");
-                    Float floatValue = Float.parseFloat(value);
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("%s event: [cluster] %s 
[network-partition] %s [value] %s", eventName,
-                                clusterId, networkPartitionId, value));
-                    }
-                    AbstractMonitor monitor = 
AutoscalerContext.getInstance().getMonitor(clusterId);
-                    if(null != monitor){
-                        NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-                        if(null != networkPartitionContext){
-                            
networkPartitionContext.setRequestsInFlightSecondDerivative(floatValue);
-                        } else {
-                            if(log.isErrorEnabled()) {
-                               log.error(String.format("Network partition 
context is not available for :" +
-                                       " [network partition] %s", 
networkPartitionId));
-                            }
-                        }
-                    } else {
-
-                        if(log.isErrorEnabled()) {
-                           log.error(String.format("Cluster monitor is not 
available for : [cluster] %s", clusterId));
-                        }
-                    }
-                } else if 
(Constants.MEMBER_FAULT_EVENT_NAME.equals(eventName)) {
-                    String clusterId = event.getProperties().get("cluster_id");
-                    String memberId = event.getProperties().get("member_id");
-
-                    if (memberId == null || memberId.isEmpty()) {
-                        if(log.isErrorEnabled()) {
-                            log.error("Member id not found in received 
message");
-                        }
-                    } else {
-                        handleMemberFaultEvent(clusterId, memberId);
-                    }
-                } else 
if(Constants.MEMBER_AVERAGE_LOAD_AVERAGE.equals(eventName)) {
-                    LoadAverage loadAverage = findLoadAverage(event);
-                    if(loadAverage != null) {
-                        String value = event.getProperties().get("value");
-                        Float floatValue = Float.parseFloat(value);
-                        loadAverage.setAverage(floatValue);
-
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("%s event: [member] %s 
[value] %s", event, event.getProperties().get("member_id"), value));
-                        }
-                    }
-                } else 
if(Constants.MEMBER_SECOND_DERIVATIVE_OF_LOAD_AVERAGE.equals(eventName)) {
-                    LoadAverage loadAverage = findLoadAverage(event);
-                    if(loadAverage != null) {
-                        String value = event.getProperties().get("value");
-                        Float floatValue = Float.parseFloat(value);
-                        loadAverage.setSecondDerivative(floatValue);
-
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("%s event: [member] %s 
[value] %s", event, event.getProperties().get("member_id"), value));
-                        }
-                    }
-                } else 
if(Constants.MEMBER_GRADIENT_LOAD_AVERAGE.equals(eventName)) {
-                    LoadAverage loadAverage = findLoadAverage(event);
-                    if(loadAverage != null) {
-                        String value = event.getProperties().get("value");
-                        Float floatValue = Float.parseFloat(value);
-                        loadAverage.setGradient(floatValue);
-
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("%s event: [member] %s 
[value] %s", event, event.getProperties().get("member_id"), value));
-                        }
-                    }
-                } else 
if(Constants.MEMBER_AVERAGE_MEMORY_CONSUMPTION.equals(eventName)) {
-                    MemoryConsumption memoryConsumption = 
findMemoryConsumption(event);
-                    if(memoryConsumption != null) {
-                        String value = event.getProperties().get("value");
-                        Float floatValue = Float.parseFloat(value);
-                        memoryConsumption.setAverage(floatValue);
-
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("%s event: [member] %s 
[value] %s", event, event.getProperties().get("member_id"), value));
-                        }
-                    }
-                } else 
if(Constants.MEMBER_SECOND_DERIVATIVE_OF_MEMORY_CONSUMPTION.equals(eventName)) {
-                    MemoryConsumption memoryConsumption = 
findMemoryConsumption(event);
-                    if(memoryConsumption != null) {
-                        String value = event.getProperties().get("value");
-                        Float floatValue = Float.parseFloat(value);
-                        memoryConsumption.setSecondDerivative(floatValue);
-
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("%s event: [member] %s 
[value] %s", event, event.getProperties().get("member_id"), value));
-                        }
-                    }
-                } else 
if(Constants.MEMBER_GRADIENT_MEMORY_CONSUMPTION.equals(eventName)) {
-                    MemoryConsumption memoryConsumption = 
findMemoryConsumption(event);
-                    if(memoryConsumption != null) {
-                        String value = event.getProperties().get("value");
-                        Float floatValue = Float.parseFloat(value);
-                        memoryConsumption.setGradient(floatValue);
-
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("%s event: [member] %s 
[value] %s",event, event.getProperties().get("member_id"), value));
-                        }
-                    }
-
-                } else if(Constants.AVERAGE_LOAD_AVERAGE.equals(eventName)) {
-
-                    String clusterId = event.getProperties().get("cluster_id");
-                    String networkPartitionId = 
event.getProperties().get("network_partition_id");
-                    String value = event.getProperties().get("value");
-                    Float floatValue = Float.parseFloat(value);
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("%s event: [cluster] %s 
[network-partition] %s [value] %s", eventName,
-                                clusterId, networkPartitionId, value));
-                    }
-                    AbstractMonitor monitor = 
AutoscalerContext.getInstance().getMonitor(clusterId);
-                    if(null != monitor){
-                        NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-                        if(null != networkPartitionContext){
-                            
networkPartitionContext.setAverageLoadAverage(floatValue);
-                        } else {
-                            if(log.isErrorEnabled()) {
-                               log.error(String.format("Network partition 
context is not available for :" +
-                                       " [network partition] %s", 
networkPartitionId));
-                            }
-                        }
-                    } else {
-
-                        if(log.isErrorEnabled()) {
-                           log.error(String.format("Cluster monitor is not 
available for : [cluster] %s", clusterId));
-                        }
-                    }
-                } else 
if(Constants.SECOND_DERIVATIVE_OF_LOAD_AVERAGE.equals(eventName)) {
-
-                    String clusterId = event.getProperties().get("cluster_id");
-                    String networkPartitionId = 
event.getProperties().get("network_partition_id");
-                    String value = event.getProperties().get("value");
-                    Float floatValue = Float.parseFloat(value);
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("%s event: [cluster] %s 
[network-partition] %s [value] %s", eventName,
-                                clusterId, networkPartitionId, value));
-                    }
-                    AbstractMonitor monitor = 
AutoscalerContext.getInstance().getMonitor(clusterId);
-                    if(null != monitor){
-                        NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-                        if(null != networkPartitionContext){
-                            
networkPartitionContext.setLoadAverageSecondDerivative(floatValue);
-                        } else {
-                            if(log.isErrorEnabled()) {
-                               log.error(String.format("Network partition 
context is not available for :" +
-                                       " [network partition] %s", 
networkPartitionId));
-                            }
-                        }
-                    } else {
-
-                        if(log.isErrorEnabled()) {
-                           log.error(String.format("Cluster monitor is not 
available for : [cluster] %s", clusterId));
-                        }
-                    }
-                } else if(Constants.GRADIENT_LOAD_AVERAGE.equals(eventName)) {
-
-                    String clusterId = event.getProperties().get("cluster_id");
-                    String networkPartitionId = 
event.getProperties().get("network_partition_id");
-                    String value = event.getProperties().get("value");
-                    Float floatValue = Float.parseFloat(value);
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("%s event: [cluster] %s 
[network-partition] %s [value] %s", eventName,
-                                clusterId, networkPartitionId, value));
-                    }
-                    AbstractMonitor monitor = 
AutoscalerContext.getInstance().getMonitor(clusterId);
-                    if(null != monitor){
-                        NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-                        if(null != networkPartitionContext){
-                            
networkPartitionContext.setLoadAverageGradient(floatValue);
-                        } else {
-                            if(log.isErrorEnabled()) {
-                               log.error(String.format("Network partition 
context is not available for :" +
-                                       " [network partition] %s", 
networkPartitionId));
-                            }
-                        }
-                    } else {
-
-                        if(log.isErrorEnabled()) {
-                           log.error(String.format("Cluster monitor is not 
available for : [cluster] %s", clusterId));
-                        }
-                    }
-                } else 
if(Constants.AVERAGE_MEMORY_CONSUMPTION.equals(eventName)) {
-
-                    String clusterId = event.getProperties().get("cluster_id");
-                    String networkPartitionId = 
event.getProperties().get("network_partition_id");
-                    String value = event.getProperties().get("value");
-                    Float floatValue = Float.parseFloat(value);
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("%s event: [cluster] %s 
[network-partition] %s [value] %s", eventName,
-                                clusterId, networkPartitionId, value));
-                    }
-                    AbstractMonitor monitor = 
AutoscalerContext.getInstance().getMonitor(clusterId);
-                    if(null != monitor){
-                        NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-                        if(null != networkPartitionContext){
-                            
networkPartitionContext.setAverageMemoryConsumption(floatValue);
-                        } else {
-                            if(log.isErrorEnabled()) {
-                               log.error(String.format("Network partition 
context is not available for :" +
-                                       " [network partition] %s", 
networkPartitionId));
-                            }
-                        }
-                    } else {
-
-                        if(log.isErrorEnabled()) {
-                           log.error(String.format("Cluster monitor is not 
available for : [cluster] %s", clusterId));
-                        }
-                    }
-                } else 
if(Constants.SECOND_DERIVATIVE_OF_MEMORY_CONSUMPTION.equals(eventName)) {
-
-                    String clusterId = event.getProperties().get("cluster_id");
-                    String networkPartitionId = 
event.getProperties().get("network_partition_id");
-                    String value = event.getProperties().get("value");
-                    Float floatValue = Float.parseFloat(value);
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("%s event: [cluster] %s 
[network-partition] %s [value] %s", eventName,
-                                clusterId, networkPartitionId, value));
-                    }
-                    AbstractMonitor monitor = 
AutoscalerContext.getInstance().getMonitor(clusterId);
-                    if(null != monitor){
-                        NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-                        if(null != networkPartitionContext){
-                            
networkPartitionContext.setMemoryConsumptionSecondDerivative(floatValue);
-                        } else {
-                            if(log.isErrorEnabled()) {
-                               log.error(String.format("Network partition 
context is not available for :" +
-                                       " [network partition] %s", 
networkPartitionId));
-                            }
-                        }
-                    } else {
-
-                        if(log.isErrorEnabled()) {
-                           log.error(String.format("Cluster monitor is not 
available for : [cluster] %s", clusterId));
-                        }
-                    }
-                } else 
if(Constants.GRADIENT_MEMORY_CONSUMPTION.equals(eventName)) {
-
-                    String clusterId = event.getProperties().get("cluster_id");
-                    String networkPartitionId = 
event.getProperties().get("network_partition_id");
-                    String value = event.getProperties().get("value");
-                    Float floatValue = Float.parseFloat(value);
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("%s event: [cluster] %s 
[network-partition] %s [value] %s", eventName,
-                                clusterId, networkPartitionId, value));
-                    }
-                    AbstractMonitor monitor = 
AutoscalerContext.getInstance().getMonitor(clusterId);
-                    if(null != monitor){
-                        NetworkPartitionContext networkPartitionContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId);
-                        if(null != networkPartitionContext){
-                            
networkPartitionContext.setMemoryConsumptionGradient(floatValue);
-                        } else {
-                            if(log.isErrorEnabled()) {
-                               log.error(String.format("Network partition 
context is not available for :" +
-                                       " [network partition] %s", 
networkPartitionId));
-                            }
-                        }
-                    } else {
-
-                        if(log.isErrorEnabled()) {
-                           log.error(String.format("Cluster monitor is not 
available for : [cluster] %s", clusterId));
-                        }
-                    }
-                }
-            } catch (Exception e) {
-                log.error("Failed to retrieve the health stat event message.", 
e);
-            }
-        }
-        log.warn("Health event Message delegater is terminated");
-    }
-
-    private LoadAverage findLoadAverage(Event event) {
-        String memberId = event.getProperties().get("member_id");
-        Member member = findMember(memberId);
-        
-        if(null == member){
-               if(log.isErrorEnabled()) {
-                log.error(String.format("Member not found: [member] %s", 
memberId));
-            }
-               return null;
-        }
-        AbstractMonitor monitor = 
AutoscalerContext.getInstance().getMonitor(member.getClusterId());
-        if(null == monitor){
-            if(log.isErrorEnabled()) {
-               log.error(String.format("Cluster monitor is not available for : 
[member] %s", memberId));
-            }
-            return null;
-        }
-        String networkPartitionId = findNetworkPartitionId(memberId);
-        MemberStatsContext memberStatsContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId)
-                        .getPartitionCtxt(member.getPartitionId())
-                        .getMemberStatsContext(memberId);
-        if(null == memberStatsContext){
-            if(log.isErrorEnabled()) {
-               log.error(String.format("Member context is not available for : 
[member] %s", memberId));
-            }
-            return null;
-        }
-        else if(!member.isActive()){
-            if(log.isDebugEnabled()){
-                log.debug(String.format("Member activated event has not 
received for the member %s. Therefore ignoring" +
-                        " the health stat", memberId));
-            }
-            return null;
-        }
-
-        LoadAverage loadAverage = memberStatsContext.getLoadAverage();
-        return loadAverage;
-    }
-
-    private MemoryConsumption findMemoryConsumption(Event event) {
-        String memberId = event.getProperties().get("member_id");
-        Member member = findMember(memberId);
-        
-        if(null == member){
-               if(log.isErrorEnabled()) {
-                log.error(String.format("Member not found: [member] %s", 
memberId));
-            }
-               return null;
-        }
-        
-        AbstractMonitor monitor = 
AutoscalerContext.getInstance().getMonitor(member.getClusterId());
-        if(null == monitor){
-            if(log.isErrorEnabled()) {
-               log.error(String.format("Cluster monitor is not available for : 
[member] %s", memberId));
-            }
-            return null;
-        }
-        
-        
-        String networkPartitionId = findNetworkPartitionId(memberId);
-        MemberStatsContext memberStatsContext = 
monitor.getNetworkPartitionCtxt(networkPartitionId)
-                        .getPartitionCtxt(member.getPartitionId())
-                        .getMemberStatsContext(memberId);
-        if(null == memberStatsContext){
-            if(log.isErrorEnabled()) {
-               log.error(String.format("Member context is not available for : 
[member] %s", memberId));
-            }
-            return null;
-        }else if(!member.isActive()){
-            if(log.isDebugEnabled()){
-                log.debug(String.format("Member activated event has not 
received for the member %s. Therefore ignoring" +
-                        " the health stat", memberId));
-            }
-            return null;
-        }
-        MemoryConsumption memoryConsumption = 
memberStatsContext.getMemoryConsumption();
-
-        return memoryConsumption;
-    }
-
-    private String findNetworkPartitionId(String memberId) {
-        for(Service service: TopologyManager.getTopology().getServices()){
-            for(Cluster cluster: service.getClusters()){
-                if(cluster.memberExists(memberId)){
-                    return cluster.getMember(memberId).getNetworkPartitionId();
-                }
-            }
-        }
-        return null;
-    }
-
-    private Member findMember(String memberId) {
-        try {
-            TopologyManager.acquireReadLock();
-            for(Service service : TopologyManager.getTopology().getServices()) 
{
-                for(Cluster cluster : service.getClusters()) {
-                    if(cluster.memberExists(memberId)) {
-                        return cluster.getMember(memberId);
-                    }
-                }
-            }
-            return null;
-        }
-        finally {
-            TopologyManager.releaseReadLock();
-        }
-    }
-
-    private void handleMemberFaultEvent(String clusterId, String memberId) {
-        try {
-               AutoscalerContext asCtx = AutoscalerContext.getInstance();
-               AbstractMonitor monitor;
-               
-               if(asCtx.moniterExist(clusterId)){
-                       monitor = asCtx.getMonitor(clusterId);
-               }else if(asCtx.lbMoniterExist(clusterId)){
-                       monitor = asCtx.getLBMonitor(clusterId);
-               }else{
-                       String errMsg = "A monitor is not found for this 
custer";
-                       log.error(errMsg);
-                       throw new RuntimeException(errMsg);
-               }
-               
-               NetworkPartitionContext nwPartitionCtxt;
-            try{
-               TopologyManager.acquireReadLock();
-               Member member = findMember(memberId);
-               
-               if(null == member){
-                       return;
-               }
-                if(!member.isActive()){
-                    if(log.isDebugEnabled()){
-                        log.debug(String.format("Member activated event has 
not received for the member %s. Therefore ignoring" +
-                                " the member fault health stat", memberId));
-                    }
-                    return;
-                }
-                   
-                   nwPartitionCtxt = monitor.getNetworkPartitionCtxt(member);
-                   
-            }finally{
-               TopologyManager.releaseReadLock();
-            }
-            // terminate the faulty member
-            CloudControllerClient ccClient = 
CloudControllerClient.getInstance();
-            ccClient.terminate(memberId);
-
-            // start a new member in the same Partition
-            String partitionId = monitor.getPartitionOfMember(memberId);
-            Partition partition = 
monitor.getDeploymentPolicy().getPartitionById(partitionId);
-            PartitionContext partitionCtxt = 
nwPartitionCtxt.getPartitionCtxt(partitionId);
-            
-            String lbClusterId = 
AutoscalerRuleEvaluator.getLbClusterId(partitionCtxt, nwPartitionCtxt);
-            ccClient.spawnAnInstance(partition, clusterId, lbClusterId, 
nwPartitionCtxt.getId());
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Instance spawned for fault member: 
[partition] %s [cluster] %s [lb cluster] %s ", 
-                                       partitionId, clusterId, lbClusterId));
-            }                       
-
-        } catch (TerminationException e) {
-            log.error(e);
-        } catch (SpawningException e) {
-            log.error(e);
-        }
-    }
-
-    public Event jsonToEvent(String json) {
-
-        Event event = new Event();
-        BufferedReader bufferedReader = new BufferedReader(new 
StringReader(json));
-        JsonReader reader = new JsonReader(bufferedReader);
-        try {
-            reader.beginObject();
-
-            if (reader.hasNext()) {
-                event.setEventName(reader.nextName());
-
-                reader.beginObject();
-                Map<String, String> properties = new HashMap<String, String>();
-                while (reader.hasNext()) {
-                    String name = reader.nextName();
-                    String value = reader.nextString();
-                    properties.put(name, value);
-                }
-                event.setProperties(properties);
-            }
-            reader.close();
-            return event;
-        } catch (IOException e) {
-            log.error("Could not extract event");
-        }
-        return null;
-    }
-
-    private class Event {
-        private String eventName;
-        private Map<String, String> properties;
-
-        private String getEventName() {
-            return eventName;
-        }
-
-        private void setEventName(String eventName) {
-            this.eventName = eventName;
-        }
-
-        private Map<String, String> getProperties() {
-            return properties;
-        }
-
-        private void setProperties(Map<String, String> properties) {
-            this.properties = properties;
-        }
-    }
-    
-    public void terminate(){
-       this.terminate = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageReceiver.java
deleted file mode 100644
index 866cd87..0000000
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageReceiver.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.message.receiver.health;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.TextMessage;
-
-public class HealthEventMessageReceiver implements MessageListener {
-
-    private static final Log log = 
LogFactory.getLog(HealthEventMessageReceiver.class);
-
-    @Override
-    public void onMessage(Message message) {
-        if (message instanceof TextMessage) {
-            TextMessage receivedMessage = (TextMessage) message;
-            try {
-                if (log.isDebugEnabled()) {
-                    log.debug("Message received: " + ((TextMessage) 
message).getText());
-                }
-                // Add received message to the queue
-                HealthEventQueue.getInstance().add(receivedMessage);
-
-            } catch (JMSException e) {
-                log.error(e.getMessage(), e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventQueue.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventQueue.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventQueue.java
deleted file mode 100644
index 731f25b..0000000
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventQueue.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.message.receiver.health;
-
-import javax.jms.TextMessage;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * Implements topology event queue.
- */
-public class HealthEventQueue extends LinkedBlockingQueue<TextMessage>{
-
-       private static final long serialVersionUID = 2556240855574421561L;
-       private static volatile HealthEventQueue instance;
-
-    private HealthEventQueue(){
-    }
-
-    public static HealthEventQueue getInstance() {
-        if (instance == null) {
-            synchronized (HealthEventQueue.class){
-                if (instance == null) {
-                    instance = new HealthEventQueue();
-                }
-            }
-        }
-        return instance;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
index 74d9770..9b7965e 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
@@ -82,6 +82,12 @@ public class AutoscalerTopologyReceiver implements Runnable {
 
     private TopologyEventMessageDelegator createMessageDelegator() {
         TopologyMessageProcessorChain processorChain = 
createEventProcessorChain();
+        return new TopologyEventMessageDelegator(processorChain);
+    }
+
+    private TopologyMessageProcessorChain createEventProcessorChain() {
+        // Listen to topology events that affect clusters
+        TopologyMessageProcessorChain processorChain = new 
TopologyMessageProcessorChain();
         processorChain.addEventListener(new CompleteTopologyEventListener() {
             @Override
             protected void onEvent(Event event) {
@@ -107,12 +113,7 @@ public class AutoscalerTopologyReceiver implements 
Runnable {
             }
 
         });
-        return new TopologyEventMessageDelegator(processorChain);
-    }
 
-    private TopologyMessageProcessorChain createEventProcessorChain() {
-        // Listen to topology events that affect clusters
-        TopologyMessageProcessorChain processorChain = new 
TopologyMessageProcessorChain();
         processorChain.addEventListener(new ClusterCreatedEventListener() {
             @Override
             protected void onEvent(Event event) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/SecondDerivativeOfMemoryConsumptionEvent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/SecondDerivativeOfMemoryConsumptionEvent.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/SecondDerivativeOfMemoryConsumptionEvent.java
index 5d9e9af..bd36b76 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/SecondDerivativeOfMemoryConsumptionEvent.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/SecondDerivativeOfMemoryConsumptionEvent.java
@@ -25,10 +25,13 @@ import org.apache.stratos.messaging.event.Event;
  *  This event is fired by Event processing engine to send second derivative 
of memory consumption
  */
 public class SecondDerivativeOfMemoryConsumptionEvent extends Event {
+
+    private final String networkPartitionId;
     private final String clusterId;
     private final float value;
 
-    public SecondDerivativeOfMemoryConsumptionEvent(String clusterId, float 
value) {
+    public SecondDerivativeOfMemoryConsumptionEvent(String networkPartitionId, 
String clusterId, float value) {
+        this.networkPartitionId = networkPartitionId;
         this.clusterId = clusterId;
         this.value = value;
     }
@@ -41,4 +44,8 @@ public class SecondDerivativeOfMemoryConsumptionEvent extends 
Event {
     public float getValue() {
         return value;
     }
+
+    public String getNetworkPartitionId() {
+        return networkPartitionId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
index f2a9bc8..7786c39 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
@@ -96,7 +96,7 @@ public class HealthStatEventMessageDelegator implements 
Runnable {
     }
 
 
-    public EventMessage jsonToEventMessage(String json) {
+    private EventMessage jsonToEventMessage(String json) {
 
         EventMessage event = new EventMessage();
         String message;
@@ -107,6 +107,9 @@ public class HealthStatEventMessageDelegator implements 
Runnable {
 
         String eventType = MessageParts[0].trim();
         eventType = eventType.substring(eventType.indexOf("\"") + 1, 
eventType.lastIndexOf("\""));
+        if(log.isDebugEnabled()){
+            log.debug(String.format("Extracted [event type] %s", eventType));
+        }
 
         event.setEventName(eventType);
         String messageTag = MessageParts[1];

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java
new file mode 100644
index 0000000..728cd8a
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.health.stat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.util.Constants;
+
+/**
+ * A thread for receiving health stat information from message broker
+ */
+public class HealthStatReceiver implements Runnable {
+    private static final Log log = LogFactory.getLog(HealthStatReceiver.class);
+    private HealthStatEventMessageDelegator messageDelegator;
+    private TopicSubscriber topicSubscriber;
+    private boolean terminated;
+
+    public HealthStatReceiver() {
+        this.messageDelegator = new HealthStatEventMessageDelegator();
+    }
+
+    public HealthStatReceiver(HealthStatEventMessageDelegator 
messageDelegator) {
+        this.messageDelegator = messageDelegator;
+    }
+
+    @Override
+    public void run() {
+        try {
+            // Start topic subscriber thread
+            topicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC);
+            topicSubscriber.setMessageListener(new 
HealthStatEventMessageListener());
+            Thread subscriberThread = new Thread(topicSubscriber);
+            subscriberThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("Health stst event message receiver thread started");
+            }
+
+            // Start health stat event message delegator thread
+            Thread receiverThread = new Thread(messageDelegator);
+            receiverThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("Health stst event message delegator thread 
started");
+            }
+
+            // Keep the thread live until terminated
+            while (!terminated);
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("Topology receiver failed", e);
+            }
+        }
+    }
+
+    public void terminate() {
+        topicSubscriber.terminate();
+        messageDelegator.terminate();
+        terminated = true;
+    }
+}

Reply via email to