Updated Branches: refs/heads/master 7ce95692f -> a6f454914
Accept fault member event messages Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/0859610c Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/0859610c Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/0859610c Branch: refs/heads/master Commit: 0859610c124ce27265cdfd8f53c366d338d0da5b Parents: 41f4e81 Author: Udara Liyanage <[email protected]> Authored: Sat Nov 30 21:50:53 2013 -0500 Committer: Udara Liyanage <[email protected]> Committed: Sat Nov 30 22:30:01 2013 -0500 ---------------------------------------------------------------------- .../health/HealthEventMessageDelegator.java | 82 ++++++++++---------- 1 file changed, 39 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/0859610c/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java index e7a03bd..61a1706 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java @@ -24,10 +24,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.AutoscalerContext; import org.apache.stratos.autoscaler.Constants; + import javax.jms.TextMessage; import java.io.BufferedReader; +import java.io.IOException; import java.io.StringReader; +import java.util.HashMap; +import java.util.Map; /** @@ -37,9 +41,7 @@ public class HealthEventMessageDelegator implements Runnable { private static final Log log = LogFactory.getLog(HealthEventMessageDelegator.class); private String eventName; - private float value; - private String clusterId; - + private Map<String, String> messageProperties; @Override public void run() { log.info("Health stat event message processor started"); @@ -50,11 +52,8 @@ public class HealthEventMessageDelegator implements Runnable { String messageText = message.getText(); - setEventValues(messageText); - - log.info(clusterId); - log.info(value); - log.info(eventName); + messageProperties = setEventValues(messageText); + log.info("Received event " + eventName); // for (Service service : TopologyManager.getTopology().getServices()){ // // if(service.clusterExists(clusterId)){ @@ -82,16 +81,27 @@ public class HealthEventMessageDelegator implements Runnable { // } // } if(Constants.AVERAGE_REQUESTS_IN_FLIGHT.equals(eventName)){ - AutoscalerContext.getInstance().getClusterContext(clusterId).setAverageRequestsInFlight(value); + String clusterId = messageProperties.get("cluster_id"); + Float messageValue = Float.parseFloat(messageProperties.get("value")); + AutoscalerContext.getInstance().getClusterContext(clusterId).setAverageRequestsInFlight(messageValue); } else if(Constants.GRADIENT_OF_REQUESTS_IN_FLIGHT.equals(eventName)){ - AutoscalerContext.getInstance().getClusterContext(clusterId).setRequestsInFlightGradient(value); + String clusterId = messageProperties.get("cluster_id"); + Float messageValue = Float.parseFloat(messageProperties.get("value")); + AutoscalerContext.getInstance().getClusterContext(clusterId).setRequestsInFlightGradient(messageValue); } else if(Constants.SECOND_DERIVATIVE_OF_REQUESTS_IN_FLIGHT.equals(eventName)){ - AutoscalerContext.getInstance().getClusterContext(clusterId).setRequestsInFlightSecondDerivative(value); + String clusterId = messageProperties.get("cluster_id"); + Float messageValue = Float.parseFloat(messageProperties.get("value")); + AutoscalerContext.getInstance().getClusterContext(clusterId).setRequestsInFlightSecondDerivative(messageValue); + }else if ("member_fault".equals(eventName)){ + // member with } - + + // clear the message properties after handling the message. + messageProperties.clear(); + } catch (Exception e) { String error = "Failed to retrieve the health stat event message."; log.error(error); @@ -99,48 +109,34 @@ public class HealthEventMessageDelegator implements Runnable { } } - public void setEventValues(String json) { - + public Map<String, String> setEventValues(String json) { + + Map<String, String> properties = new HashMap<String, String>(); + BufferedReader bufferedReader = new BufferedReader(new StringReader(json)); + JsonReader reader = new JsonReader(bufferedReader); try { - BufferedReader bufferedReader = new BufferedReader(new StringReader(json)); - JsonReader reader = new JsonReader(bufferedReader); + reader.beginObject(); if(reader.hasNext()) { - eventName = reader.nextName(); - reader.beginObject(); - if("cluster_id".equals(reader.nextName())) { - - if(reader.hasNext()){ - - clusterId = reader.nextString(); - } - } - if(reader.hasNext()) { - - if ("value".equals(reader.nextName())) { - - if(reader.hasNext()){ - - String stringValue = reader.nextString(); - try { - - value = Float.parseFloat(stringValue); - } catch (NumberFormatException ex) { - log.error("Error while converting health stat message value to float", ex); - } - } - } + + reader.beginObject(); + while(reader.hasNext()){ + String name = reader.nextName(); + String value = reader.nextString(); + properties.put(name, value); } + } - reader.close(); - - } catch (Exception e) { + reader.close(); + return properties; + }catch(IOException e) { log.error( "Could not extract message header"); // throw new RuntimeException("Could not extract message header", e); } + return null; } }
