Updated Branches: refs/heads/master 41687d86d -> 253bbb08a
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java deleted file mode 100644 index d1bcb1c..0000000 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java +++ /dev/null @@ -1,639 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.autoscaler.message.receiver.health; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.StringReader; -import java.util.HashMap; -import java.util.Map; - -import javax.jms.TextMessage; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.autoscaler.*; -import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient; -import org.apache.stratos.autoscaler.exception.SpawningException; -import org.apache.stratos.autoscaler.exception.TerminationException; -import org.apache.stratos.autoscaler.monitor.AbstractMonitor; -import org.apache.stratos.autoscaler.policy.model.LoadAverage; -import org.apache.stratos.autoscaler.policy.model.MemoryConsumption; -import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; -import org.apache.stratos.cloud.controller.deployment.partition.Partition; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.Member; -import org.apache.stratos.messaging.domain.topology.Service; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; - -import com.google.gson.stream.JsonReader; - - -/** - * A thread for processing topology messages and updating the topology data structure. - */ -public class HealthEventMessageDelegator implements Runnable { - - private static final Log log = LogFactory.getLog(HealthEventMessageDelegator.class); - private boolean terminate = false; - - @Override - public void run() { - if(log.isInfoEnabled()) { - log.info("Health event message delegator started"); - } - - if(log.isDebugEnabled()) { - log.debug("Waiting for topology to be initialized"); - } - while(!TopologyManager.getTopology().isInitialized()); - - while (!terminate) { - try { - TextMessage message = HealthEventQueue.getInstance().take(); - - String messageText = message.getText(); - if (log.isDebugEnabled()) { - log.debug("Health event message received: [message] " + messageText); - } - Event event = jsonToEvent(messageText); - String eventName = event.getEventName(); - - if (log.isInfoEnabled()) { - log.info(String.format("Received event: [event-name] %s", eventName)); - } - - if (Constants.AVERAGE_REQUESTS_IN_FLIGHT.equals(eventName)) { - String clusterId = event.getProperties().get("cluster_id"); - String networkPartitionId = event.getProperties().get("network_partition_id"); - String value = event.getProperties().get("value"); - Float floatValue = Float.parseFloat(value); - - if (log.isDebugEnabled()) { - log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName, - clusterId, networkPartitionId, value)); - } - - AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId); - if(null != monitor){ - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - if(null != networkPartitionContext){ - networkPartitionContext.setAverageRequestsInFlight(floatValue); - } else { - if(log.isErrorEnabled()) { - log.error(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); - } - } - } else { - - if(log.isErrorEnabled()) { - log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId)); - } - } - - } else if (Constants.GRADIENT_OF_REQUESTS_IN_FLIGHT.equals(eventName)) { - String clusterId = event.getProperties().get("cluster_id"); - String networkPartitionId = event.getProperties().get("network_partition_id"); - String value = event.getProperties().get("value"); - Float floatValue = Float.parseFloat(value); - - if (log.isDebugEnabled()) { - log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName, - clusterId, networkPartitionId, value)); - } - AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId); - if(null != monitor){ - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - if(null != networkPartitionContext){ - networkPartitionContext.setRequestsInFlightGradient(floatValue); - } else { - if(log.isErrorEnabled()) { - log.error(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); - } - } - } else { - - if(log.isErrorEnabled()) { - log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId)); - } - } - - } else if (Constants.SECOND_DERIVATIVE_OF_REQUESTS_IN_FLIGHT.equals(eventName)) { - String clusterId = event.getProperties().get("cluster_id"); - String networkPartitionId = event.getProperties().get("network_partition_id"); - String value = event.getProperties().get("value"); - Float floatValue = Float.parseFloat(value); - - if (log.isDebugEnabled()) { - log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName, - clusterId, networkPartitionId, value)); - } - AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId); - if(null != monitor){ - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - if(null != networkPartitionContext){ - networkPartitionContext.setRequestsInFlightSecondDerivative(floatValue); - } else { - if(log.isErrorEnabled()) { - log.error(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); - } - } - } else { - - if(log.isErrorEnabled()) { - log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId)); - } - } - } else if (Constants.MEMBER_FAULT_EVENT_NAME.equals(eventName)) { - String clusterId = event.getProperties().get("cluster_id"); - String memberId = event.getProperties().get("member_id"); - - if (memberId == null || memberId.isEmpty()) { - if(log.isErrorEnabled()) { - log.error("Member id not found in received message"); - } - } else { - handleMemberFaultEvent(clusterId, memberId); - } - } else if(Constants.MEMBER_AVERAGE_LOAD_AVERAGE.equals(eventName)) { - LoadAverage loadAverage = findLoadAverage(event); - if(loadAverage != null) { - String value = event.getProperties().get("value"); - Float floatValue = Float.parseFloat(value); - loadAverage.setAverage(floatValue); - - if (log.isDebugEnabled()) { - log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value)); - } - } - } else if(Constants.MEMBER_SECOND_DERIVATIVE_OF_LOAD_AVERAGE.equals(eventName)) { - LoadAverage loadAverage = findLoadAverage(event); - if(loadAverage != null) { - String value = event.getProperties().get("value"); - Float floatValue = Float.parseFloat(value); - loadAverage.setSecondDerivative(floatValue); - - if (log.isDebugEnabled()) { - log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value)); - } - } - } else if(Constants.MEMBER_GRADIENT_LOAD_AVERAGE.equals(eventName)) { - LoadAverage loadAverage = findLoadAverage(event); - if(loadAverage != null) { - String value = event.getProperties().get("value"); - Float floatValue = Float.parseFloat(value); - loadAverage.setGradient(floatValue); - - if (log.isDebugEnabled()) { - log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value)); - } - } - } else if(Constants.MEMBER_AVERAGE_MEMORY_CONSUMPTION.equals(eventName)) { - MemoryConsumption memoryConsumption = findMemoryConsumption(event); - if(memoryConsumption != null) { - String value = event.getProperties().get("value"); - Float floatValue = Float.parseFloat(value); - memoryConsumption.setAverage(floatValue); - - if (log.isDebugEnabled()) { - log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value)); - } - } - } else if(Constants.MEMBER_SECOND_DERIVATIVE_OF_MEMORY_CONSUMPTION.equals(eventName)) { - MemoryConsumption memoryConsumption = findMemoryConsumption(event); - if(memoryConsumption != null) { - String value = event.getProperties().get("value"); - Float floatValue = Float.parseFloat(value); - memoryConsumption.setSecondDerivative(floatValue); - - if (log.isDebugEnabled()) { - log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value)); - } - } - } else if(Constants.MEMBER_GRADIENT_MEMORY_CONSUMPTION.equals(eventName)) { - MemoryConsumption memoryConsumption = findMemoryConsumption(event); - if(memoryConsumption != null) { - String value = event.getProperties().get("value"); - Float floatValue = Float.parseFloat(value); - memoryConsumption.setGradient(floatValue); - - if (log.isDebugEnabled()) { - log.debug(String.format("%s event: [member] %s [value] %s",event, event.getProperties().get("member_id"), value)); - } - } - - } else if(Constants.AVERAGE_LOAD_AVERAGE.equals(eventName)) { - - String clusterId = event.getProperties().get("cluster_id"); - String networkPartitionId = event.getProperties().get("network_partition_id"); - String value = event.getProperties().get("value"); - Float floatValue = Float.parseFloat(value); - - if (log.isDebugEnabled()) { - log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName, - clusterId, networkPartitionId, value)); - } - AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId); - if(null != monitor){ - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - if(null != networkPartitionContext){ - networkPartitionContext.setAverageLoadAverage(floatValue); - } else { - if(log.isErrorEnabled()) { - log.error(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); - } - } - } else { - - if(log.isErrorEnabled()) { - log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId)); - } - } - } else if(Constants.SECOND_DERIVATIVE_OF_LOAD_AVERAGE.equals(eventName)) { - - String clusterId = event.getProperties().get("cluster_id"); - String networkPartitionId = event.getProperties().get("network_partition_id"); - String value = event.getProperties().get("value"); - Float floatValue = Float.parseFloat(value); - - if (log.isDebugEnabled()) { - log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName, - clusterId, networkPartitionId, value)); - } - AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId); - if(null != monitor){ - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - if(null != networkPartitionContext){ - networkPartitionContext.setLoadAverageSecondDerivative(floatValue); - } else { - if(log.isErrorEnabled()) { - log.error(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); - } - } - } else { - - if(log.isErrorEnabled()) { - log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId)); - } - } - } else if(Constants.GRADIENT_LOAD_AVERAGE.equals(eventName)) { - - String clusterId = event.getProperties().get("cluster_id"); - String networkPartitionId = event.getProperties().get("network_partition_id"); - String value = event.getProperties().get("value"); - Float floatValue = Float.parseFloat(value); - - if (log.isDebugEnabled()) { - log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName, - clusterId, networkPartitionId, value)); - } - AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId); - if(null != monitor){ - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - if(null != networkPartitionContext){ - networkPartitionContext.setLoadAverageGradient(floatValue); - } else { - if(log.isErrorEnabled()) { - log.error(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); - } - } - } else { - - if(log.isErrorEnabled()) { - log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId)); - } - } - } else if(Constants.AVERAGE_MEMORY_CONSUMPTION.equals(eventName)) { - - String clusterId = event.getProperties().get("cluster_id"); - String networkPartitionId = event.getProperties().get("network_partition_id"); - String value = event.getProperties().get("value"); - Float floatValue = Float.parseFloat(value); - - if (log.isDebugEnabled()) { - log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName, - clusterId, networkPartitionId, value)); - } - AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId); - if(null != monitor){ - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - if(null != networkPartitionContext){ - networkPartitionContext.setAverageMemoryConsumption(floatValue); - } else { - if(log.isErrorEnabled()) { - log.error(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); - } - } - } else { - - if(log.isErrorEnabled()) { - log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId)); - } - } - } else if(Constants.SECOND_DERIVATIVE_OF_MEMORY_CONSUMPTION.equals(eventName)) { - - String clusterId = event.getProperties().get("cluster_id"); - String networkPartitionId = event.getProperties().get("network_partition_id"); - String value = event.getProperties().get("value"); - Float floatValue = Float.parseFloat(value); - - if (log.isDebugEnabled()) { - log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName, - clusterId, networkPartitionId, value)); - } - AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId); - if(null != monitor){ - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - if(null != networkPartitionContext){ - networkPartitionContext.setMemoryConsumptionSecondDerivative(floatValue); - } else { - if(log.isErrorEnabled()) { - log.error(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); - } - } - } else { - - if(log.isErrorEnabled()) { - log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId)); - } - } - } else if(Constants.GRADIENT_MEMORY_CONSUMPTION.equals(eventName)) { - - String clusterId = event.getProperties().get("cluster_id"); - String networkPartitionId = event.getProperties().get("network_partition_id"); - String value = event.getProperties().get("value"); - Float floatValue = Float.parseFloat(value); - - if (log.isDebugEnabled()) { - log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName, - clusterId, networkPartitionId, value)); - } - AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId); - if(null != monitor){ - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - if(null != networkPartitionContext){ - networkPartitionContext.setMemoryConsumptionGradient(floatValue); - } else { - if(log.isErrorEnabled()) { - log.error(String.format("Network partition context is not available for :" + - " [network partition] %s", networkPartitionId)); - } - } - } else { - - if(log.isErrorEnabled()) { - log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId)); - } - } - } - } catch (Exception e) { - log.error("Failed to retrieve the health stat event message.", e); - } - } - log.warn("Health event Message delegater is terminated"); - } - - private LoadAverage findLoadAverage(Event event) { - String memberId = event.getProperties().get("member_id"); - Member member = findMember(memberId); - - if(null == member){ - if(log.isErrorEnabled()) { - log.error(String.format("Member not found: [member] %s", memberId)); - } - return null; - } - AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(member.getClusterId()); - if(null == monitor){ - if(log.isErrorEnabled()) { - log.error(String.format("Cluster monitor is not available for : [member] %s", memberId)); - } - return null; - } - String networkPartitionId = findNetworkPartitionId(memberId); - MemberStatsContext memberStatsContext = monitor.getNetworkPartitionCtxt(networkPartitionId) - .getPartitionCtxt(member.getPartitionId()) - .getMemberStatsContext(memberId); - if(null == memberStatsContext){ - if(log.isErrorEnabled()) { - log.error(String.format("Member context is not available for : [member] %s", memberId)); - } - return null; - } - else if(!member.isActive()){ - if(log.isDebugEnabled()){ - log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" + - " the health stat", memberId)); - } - return null; - } - - LoadAverage loadAverage = memberStatsContext.getLoadAverage(); - return loadAverage; - } - - private MemoryConsumption findMemoryConsumption(Event event) { - String memberId = event.getProperties().get("member_id"); - Member member = findMember(memberId); - - if(null == member){ - if(log.isErrorEnabled()) { - log.error(String.format("Member not found: [member] %s", memberId)); - } - return null; - } - - AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(member.getClusterId()); - if(null == monitor){ - if(log.isErrorEnabled()) { - log.error(String.format("Cluster monitor is not available for : [member] %s", memberId)); - } - return null; - } - - - String networkPartitionId = findNetworkPartitionId(memberId); - MemberStatsContext memberStatsContext = monitor.getNetworkPartitionCtxt(networkPartitionId) - .getPartitionCtxt(member.getPartitionId()) - .getMemberStatsContext(memberId); - if(null == memberStatsContext){ - if(log.isErrorEnabled()) { - log.error(String.format("Member context is not available for : [member] %s", memberId)); - } - return null; - }else if(!member.isActive()){ - if(log.isDebugEnabled()){ - log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" + - " the health stat", memberId)); - } - return null; - } - MemoryConsumption memoryConsumption = memberStatsContext.getMemoryConsumption(); - - return memoryConsumption; - } - - private String findNetworkPartitionId(String memberId) { - for(Service service: TopologyManager.getTopology().getServices()){ - for(Cluster cluster: service.getClusters()){ - if(cluster.memberExists(memberId)){ - return cluster.getMember(memberId).getNetworkPartitionId(); - } - } - } - return null; - } - - private Member findMember(String memberId) { - try { - TopologyManager.acquireReadLock(); - for(Service service : TopologyManager.getTopology().getServices()) { - for(Cluster cluster : service.getClusters()) { - if(cluster.memberExists(memberId)) { - return cluster.getMember(memberId); - } - } - } - return null; - } - finally { - TopologyManager.releaseReadLock(); - } - } - - private void handleMemberFaultEvent(String clusterId, String memberId) { - try { - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractMonitor monitor; - - if(asCtx.moniterExist(clusterId)){ - monitor = asCtx.getMonitor(clusterId); - }else if(asCtx.lbMoniterExist(clusterId)){ - monitor = asCtx.getLBMonitor(clusterId); - }else{ - String errMsg = "A monitor is not found for this custer"; - log.error(errMsg); - throw new RuntimeException(errMsg); - } - - NetworkPartitionContext nwPartitionCtxt; - try{ - TopologyManager.acquireReadLock(); - Member member = findMember(memberId); - - if(null == member){ - return; - } - if(!member.isActive()){ - if(log.isDebugEnabled()){ - log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" + - " the member fault health stat", memberId)); - } - return; - } - - nwPartitionCtxt = monitor.getNetworkPartitionCtxt(member); - - }finally{ - TopologyManager.releaseReadLock(); - } - // terminate the faulty member - CloudControllerClient ccClient = CloudControllerClient.getInstance(); - ccClient.terminate(memberId); - - // start a new member in the same Partition - String partitionId = monitor.getPartitionOfMember(memberId); - Partition partition = monitor.getDeploymentPolicy().getPartitionById(partitionId); - PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId); - - String lbClusterId = AutoscalerRuleEvaluator.getLbClusterId(partitionCtxt, nwPartitionCtxt); - ccClient.spawnAnInstance(partition, clusterId, lbClusterId, nwPartitionCtxt.getId()); - if (log.isInfoEnabled()) { - log.info(String.format("Instance spawned for fault member: [partition] %s [cluster] %s [lb cluster] %s ", - partitionId, clusterId, lbClusterId)); - } - - } catch (TerminationException e) { - log.error(e); - } catch (SpawningException e) { - log.error(e); - } - } - - public Event jsonToEvent(String json) { - - Event event = new Event(); - BufferedReader bufferedReader = new BufferedReader(new StringReader(json)); - JsonReader reader = new JsonReader(bufferedReader); - try { - reader.beginObject(); - - if (reader.hasNext()) { - event.setEventName(reader.nextName()); - - reader.beginObject(); - Map<String, String> properties = new HashMap<String, String>(); - while (reader.hasNext()) { - String name = reader.nextName(); - String value = reader.nextString(); - properties.put(name, value); - } - event.setProperties(properties); - } - reader.close(); - return event; - } catch (IOException e) { - log.error("Could not extract event"); - } - return null; - } - - private class Event { - private String eventName; - private Map<String, String> properties; - - private String getEventName() { - return eventName; - } - - private void setEventName(String eventName) { - this.eventName = eventName; - } - - private Map<String, String> getProperties() { - return properties; - } - - private void setProperties(Map<String, String> properties) { - this.properties = properties; - } - } - - public void terminate(){ - this.terminate = true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageReceiver.java deleted file mode 100644 index 866cd87..0000000 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageReceiver.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.autoscaler.message.receiver.health; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.TextMessage; - -public class HealthEventMessageReceiver implements MessageListener { - - private static final Log log = LogFactory.getLog(HealthEventMessageReceiver.class); - - @Override - public void onMessage(Message message) { - if (message instanceof TextMessage) { - TextMessage receivedMessage = (TextMessage) message; - try { - if (log.isDebugEnabled()) { - log.debug("Message received: " + ((TextMessage) message).getText()); - } - // Add received message to the queue - HealthEventQueue.getInstance().add(receivedMessage); - - } catch (JMSException e) { - log.error(e.getMessage(), e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventQueue.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventQueue.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventQueue.java deleted file mode 100644 index 731f25b..0000000 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventQueue.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.autoscaler.message.receiver.health; - -import javax.jms.TextMessage; -import java.util.concurrent.LinkedBlockingQueue; - -/** - * Implements topology event queue. - */ -public class HealthEventQueue extends LinkedBlockingQueue<TextMessage>{ - - private static final long serialVersionUID = 2556240855574421561L; - private static volatile HealthEventQueue instance; - - private HealthEventQueue(){ - } - - public static HealthEventQueue getInstance() { - if (instance == null) { - synchronized (HealthEventQueue.class){ - if (instance == null) { - instance = new HealthEventQueue(); - } - } - } - return instance; - } -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java index 74d9770..9b7965e 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java @@ -82,6 +82,12 @@ public class AutoscalerTopologyReceiver implements Runnable { private TopologyEventMessageDelegator createMessageDelegator() { TopologyMessageProcessorChain processorChain = createEventProcessorChain(); + return new TopologyEventMessageDelegator(processorChain); + } + + private TopologyMessageProcessorChain createEventProcessorChain() { + // Listen to topology events that affect clusters + TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain(); processorChain.addEventListener(new CompleteTopologyEventListener() { @Override protected void onEvent(Event event) { @@ -107,12 +113,7 @@ public class AutoscalerTopologyReceiver implements Runnable { } }); - return new TopologyEventMessageDelegator(processorChain); - } - private TopologyMessageProcessorChain createEventProcessorChain() { - // Listen to topology events that affect clusters - TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain(); processorChain.addEventListener(new ClusterCreatedEventListener() { @Override protected void onEvent(Event event) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/SecondDerivativeOfMemoryConsumptionEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/SecondDerivativeOfMemoryConsumptionEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/SecondDerivativeOfMemoryConsumptionEvent.java index 5d9e9af..bd36b76 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/SecondDerivativeOfMemoryConsumptionEvent.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/SecondDerivativeOfMemoryConsumptionEvent.java @@ -25,10 +25,13 @@ import org.apache.stratos.messaging.event.Event; * This event is fired by Event processing engine to send second derivative of memory consumption */ public class SecondDerivativeOfMemoryConsumptionEvent extends Event { + + private final String networkPartitionId; private final String clusterId; private final float value; - public SecondDerivativeOfMemoryConsumptionEvent(String clusterId, float value) { + public SecondDerivativeOfMemoryConsumptionEvent(String networkPartitionId, String clusterId, float value) { + this.networkPartitionId = networkPartitionId; this.clusterId = clusterId; this.value = value; } @@ -41,4 +44,8 @@ public class SecondDerivativeOfMemoryConsumptionEvent extends Event { public float getValue() { return value; } + + public String getNetworkPartitionId() { + return networkPartitionId; + } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java index f2a9bc8..7786c39 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java @@ -96,7 +96,7 @@ public class HealthStatEventMessageDelegator implements Runnable { } - public EventMessage jsonToEventMessage(String json) { + private EventMessage jsonToEventMessage(String json) { EventMessage event = new EventMessage(); String message; @@ -107,6 +107,9 @@ public class HealthStatEventMessageDelegator implements Runnable { String eventType = MessageParts[0].trim(); eventType = eventType.substring(eventType.indexOf("\"") + 1, eventType.lastIndexOf("\"")); + if(log.isDebugEnabled()){ + log.debug(String.format("Extracted [event type] %s", eventType)); + } event.setEventName(eventType); String messageTag = MessageParts[1]; http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java new file mode 100644 index 0000000..728cd8a --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.message.receiver.health.stat; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber; +import org.apache.stratos.messaging.util.Constants; + +/** + * A thread for receiving health stat information from message broker + */ +public class HealthStatReceiver implements Runnable { + private static final Log log = LogFactory.getLog(HealthStatReceiver.class); + private HealthStatEventMessageDelegator messageDelegator; + private TopicSubscriber topicSubscriber; + private boolean terminated; + + public HealthStatReceiver() { + this.messageDelegator = new HealthStatEventMessageDelegator(); + } + + public HealthStatReceiver(HealthStatEventMessageDelegator messageDelegator) { + this.messageDelegator = messageDelegator; + } + + @Override + public void run() { + try { + // Start topic subscriber thread + topicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC); + topicSubscriber.setMessageListener(new HealthStatEventMessageListener()); + Thread subscriberThread = new Thread(topicSubscriber); + subscriberThread.start(); + if (log.isDebugEnabled()) { + log.debug("Health stst event message receiver thread started"); + } + + // Start health stat event message delegator thread + Thread receiverThread = new Thread(messageDelegator); + receiverThread.start(); + if (log.isDebugEnabled()) { + log.debug("Health stst event message delegator thread started"); + } + + // Keep the thread live until terminated + while (!terminated); + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Topology receiver failed", e); + } + } + } + + public void terminate() { + topicSubscriber.terminate(); + messageDelegator.terminate(); + terminated = true; + } +}
