Implemented In Flight request count logic.

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/b641e85f
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/b641e85f
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/b641e85f

Branch: refs/heads/gsoc-projects-2015
Commit: b641e85f28905ec14ec3db79472c7cf95d561268
Parents: cae92c5
Author: swapnilpatilRajaram <[email protected]>
Authored: Thu Aug 13 15:58:05 2015 +0000
Committer: swapnilpatilRajaram <[email protected]>
Committed: Thu Aug 13 15:58:05 2015 +0000

----------------------------------------------------------------------
 .../aws-extension/src/main/conf/aws.properties  |   3 +
 .../apache/stratos/aws/extension/AWSHelper.java | 165 +++++++++++++++++++
 .../stratos/aws/extension/AWSLoadBalancer.java  |   6 +-
 .../aws/extension/AWSStatisticsReader.java      |  35 +++-
 .../apache/stratos/aws/extension/Constants.java |  49 +++---
 5 files changed, 235 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/b641e85f/extensions/load-balancer/aws-extension/src/main/conf/aws.properties
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/aws-extension/src/main/conf/aws.properties 
b/extensions/load-balancer/aws-extension/src/main/conf/aws.properties
index d4cc18a..2bb2879 100644
--- a/extensions/load-balancer/aws-extension/src/main/conf/aws.properties
+++ b/extensions/load-balancer/aws-extension/src/main/conf/aws.properties
@@ -10,3 +10,6 @@ allowed-cidr-ip=0.0.0.0/0
 # Internet Protocol allowed for incoming requests for security group mentioned 
in 'load-balancer-security-group-name'. 
 # Comma separated e.g. tcp,udp
 allowed-protocols=tcp
+# statistics-interval denotes the interval in seconds for which statistics are 
gathered to calculate request in flight count.
+# This must be multiple of 60.
+statistics-interval=60

http://git-wip-us.apache.org/repos/asf/stratos/blob/b641e85f/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java
 
b/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java
index b822939..7929099 100644
--- 
a/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java
+++ 
b/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
@@ -35,11 +36,18 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.load.balancer.common.domain.*;
 import 
org.apache.stratos.load.balancer.extension.api.exception.LoadBalancerExtensionException;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
+import com.amazonaws.services.cloudwatch.model.Datapoint;
+import com.amazonaws.services.cloudwatch.model.Dimension;
+import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsRequest;
+import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsResult;
 import com.amazonaws.services.ec2.AmazonEC2Client;
 import com.amazonaws.services.ec2.model.AuthorizeSecurityGroupIngressRequest;
 import com.amazonaws.services.ec2.model.CreateSecurityGroupRequest;
@@ -58,6 +66,7 @@ public class AWSHelper {
        private String lbSecurityGroupName;
        private String lbSecurityGroupDescription;
        private String allowedCidrIpForLBSecurityGroup;
+       private int statisticsInterval;
 
        private AtomicInteger lbSequence;
 
@@ -70,6 +79,7 @@ public class AWSHelper {
 
        AmazonElasticLoadBalancingClient elbClient;
        AmazonEC2Client ec2Client;
+       private AmazonCloudWatchClient cloudWatchClient;
 
        private static final Log log = LogFactory.getLog(AWSHelper.class);
 
@@ -141,6 +151,25 @@ public class AWSHelper {
                                
this.allowedProtocolsForLBSecurityGroup.add(protocol);
                        }
 
+                       String interval = properties
+                                       
.getProperty(Constants.STATISTICS_INTERVAL);
+
+                       if (interval == null || interval.isEmpty()) {
+                               this.statisticsInterval = 
Constants.STATISTICS_INTERVAL_MULTIPLE_OF;
+                       } else {
+                               try {
+                                       this.statisticsInterval = 
Integer.parseInt(interval);
+
+                                       if (this.statisticsInterval
+                                                       % 
Constants.STATISTICS_INTERVAL_MULTIPLE_OF != 0) {
+                                               this.statisticsInterval = 
Constants.STATISTICS_INTERVAL_MULTIPLE_OF;
+                                       }
+                               } catch (NumberFormatException e) {
+                                       log.warn("Invalid statistics interval. 
Setting it to 15.");
+                                       this.statisticsInterval = 15;
+                               }
+                       }
+
                        this.lbSecurityGroupDescription = 
Constants.LOAD_BALANCER_SECURITY_GROUP_DESCRIPTION;
 
                        regionToSecurityGroupIdMap = new 
ConcurrentHashMap<String, String>();
@@ -153,6 +182,9 @@ public class AWSHelper {
 
                        ec2Client = new AmazonEC2Client(awsCredentials, 
clientConfiguration);
 
+                       cloudWatchClient = new 
AmazonCloudWatchClient(awsCredentials,
+                                       clientConfiguration);
+
                } catch (IOException e) {
                        log.error("Error reading aws configuration file.");
                        throw new LoadBalancerExtensionException(
@@ -166,6 +198,10 @@ public class AWSHelper {
                }
        }
 
+       public int getStatisticsInterval() {
+               return statisticsInterval;
+       }
+
        public int getNextLBSequence() {
                return lbSequence.getAndIncrement();
        }
@@ -582,6 +618,135 @@ public class AWSHelper {
        }
 
        /**
+        * @param loadBalancerName
+        * @param region
+        * @param timeInterval
+        *            in seconds
+        * @return
+        */
+       public int getRequestCount(String loadBalancerName, String region,
+                       int timeInterval) {
+               int count = 0;
+
+               try {
+                       GetMetricStatisticsRequest request = new 
GetMetricStatisticsRequest();
+                       
request.setMetricName(Constants.REQUEST_COUNT_METRIC_NAME);
+                       
request.setNamespace(Constants.CLOUD_WATCH_NAMESPACE_NAME);
+
+                       Date currentTime = new 
DateTime(DateTimeZone.UTC).toDate();
+                       Date pastTime = new 
DateTime(DateTimeZone.UTC).minusSeconds(
+                                       timeInterval).toDate();
+
+                       request.setStartTime(pastTime);
+                       request.setEndTime(currentTime);
+
+                       request.setPeriod(timeInterval);
+
+                       HashSet<String> statistics = new HashSet<String>();
+                       statistics.add(Constants.SUM_STATISTICS_NAME);
+                       request.setStatistics(statistics);
+
+                       HashSet<Dimension> dimensions = new 
HashSet<Dimension>();
+                       Dimension loadBalancerDimension = new Dimension();
+                       loadBalancerDimension
+                                       
.setName(Constants.LOAD_BALANCER_DIMENTION_NAME);
+                       loadBalancerDimension.setValue(loadBalancerName);
+                       dimensions.add(loadBalancerDimension);
+                       request.setDimensions(dimensions);
+
+                       cloudWatchClient.setEndpoint(String.format(
+                                       
Constants.CLOUD_WATCH_ENDPOINT_URL_FORMAT, region));
+
+                       GetMetricStatisticsResult result = cloudWatchClient
+                                       .getMetricStatistics(request);
+
+                       List<Datapoint> dataPoints = result.getDatapoints();
+
+                       if (dataPoints != null && dataPoints.size() > 0) {
+                               count = dataPoints.get(0).getSum().intValue();
+                       }
+
+               } catch (AmazonClientException e) {
+                       log.error(
+                                       "Could not get request count statistics 
of load balancer "
+                                                       + loadBalancerName, e);
+               }
+
+               return count;
+       }
+
+       public int getAllResponsesCount(String loadBalancerName, String region,
+                       int timeInterval) {
+               int total = 0;
+
+               Date currentTime = new DateTime(DateTimeZone.UTC).toDate();
+               Date pastTime = new DateTime(DateTimeZone.UTC).minusSeconds(
+                               timeInterval).toDate();
+
+               total += getResponseCountForMetric(loadBalancerName, region,
+                               Constants.HTTP_RESPONSE_2XX, pastTime, 
currentTime,
+                               timeInterval);
+               total += getResponseCountForMetric(loadBalancerName, region,
+                               Constants.HTTP_RESPONSE_3XX, pastTime, 
currentTime,
+                               timeInterval);
+               total += getResponseCountForMetric(loadBalancerName, region,
+                               Constants.HTTP_RESPONSE_4XX, pastTime, 
currentTime,
+                               timeInterval);
+               total += getResponseCountForMetric(loadBalancerName, region,
+                               Constants.HTTP_RESPONSE_5XX, pastTime, 
currentTime,
+                               timeInterval);
+
+               return total;
+       }
+
+       public int getResponseCountForMetric(String loadBalancerName,
+                       String region, String metricName, Date startTime, Date 
endTime,
+                       int timeInterval) {
+               int count = 0;
+
+               try {
+                       GetMetricStatisticsRequest request = new 
GetMetricStatisticsRequest();
+                       request.setMetricName(metricName);
+                       
request.setNamespace(Constants.CLOUD_WATCH_NAMESPACE_NAME);
+
+                       request.setStartTime(startTime);
+                       request.setEndTime(endTime);
+
+                       request.setPeriod(timeInterval);
+
+                       HashSet<String> statistics = new HashSet<String>();
+                       statistics.add(Constants.SUM_STATISTICS_NAME);
+                       request.setStatistics(statistics);
+
+                       HashSet<Dimension> dimensions = new 
HashSet<Dimension>();
+                       Dimension loadBalancerDimension = new Dimension();
+                       loadBalancerDimension
+                                       
.setName(Constants.LOAD_BALANCER_DIMENTION_NAME);
+                       loadBalancerDimension.setValue(loadBalancerName);
+                       dimensions.add(loadBalancerDimension);
+                       request.setDimensions(dimensions);
+
+                       cloudWatchClient.setEndpoint(String.format(
+                                       
Constants.CLOUD_WATCH_ENDPOINT_URL_FORMAT, region));
+
+                       GetMetricStatisticsResult result = cloudWatchClient
+                                       .getMetricStatistics(request);
+
+                       List<Datapoint> dataPoints = result.getDatapoints();
+
+                       if (dataPoints != null && dataPoints.size() > 0) {
+                               count = dataPoints.get(0).getSum().intValue();
+                       }
+
+               } catch (AmazonClientException e) {
+                       log.error("Could not get the statistics for metric " + 
metricName
+                                       + " of load balancer " + 
loadBalancerName, e);
+               }
+
+               return count;
+       }
+
+       /**
         * Returns the Listeners required for the service. Listeners are derived
         * from the proxy port, port and protocol values of the service.
         * 

http://git-wip-us.apache.org/repos/asf/stratos/blob/b641e85f/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java
 
b/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java
index c3b80c7..ba04e5c 100644
--- 
a/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java
+++ 
b/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java
@@ -42,7 +42,7 @@ public class AWSLoadBalancer implements LoadBalancer {
        private static final Log log = LogFactory.getLog(AWSLoadBalancer.class);
 
        // A map <clusterId, load balancer id>
-       private ConcurrentHashMap<String, LoadBalancerInfo> 
clusterIdToLoadBalancerMap;
+       private static ConcurrentHashMap<String, LoadBalancerInfo> 
clusterIdToLoadBalancerMap;
 
        private AWSHelper awsHelper;
 
@@ -272,6 +272,10 @@ public class AWSLoadBalancer implements LoadBalancer {
 
                // Remove domain mappings
        }
+
+       public static ConcurrentHashMap<String, LoadBalancerInfo> 
getClusterIdToLoadBalancerMap() {
+               return clusterIdToLoadBalancerMap;
+       }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/stratos/blob/b641e85f/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSStatisticsReader.java
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSStatisticsReader.java
 
b/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSStatisticsReader.java
index f7afc3a..40d51e9 100644
--- 
a/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSStatisticsReader.java
+++ 
b/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSStatisticsReader.java
@@ -29,8 +29,10 @@ import org.apache.stratos.load.balancer.common.domain.Port;
 import org.apache.stratos.load.balancer.common.domain.Service;
 import 
org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
 import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
+import 
org.apache.stratos.load.balancer.extension.api.exception.LoadBalancerExtensionException;
 
 import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * AWS statistics reader.
@@ -42,11 +44,16 @@ public class AWSStatisticsReader implements 
LoadBalancerStatisticsReader {
        private TopologyProvider topologyProvider;
        private String clusterInstanceId;
 
-       public AWSStatisticsReader(TopologyProvider topologyProvider) {
+       private AWSHelper awsHelper;
+
+       public AWSStatisticsReader(TopologyProvider topologyProvider)
+                       throws LoadBalancerExtensionException {
                this.topologyProvider = topologyProvider;
                this.clusterInstanceId = System.getProperty(
                                StratosConstants.CLUSTER_INSTANCE_ID,
                                StratosConstants.NOT_DEFINED);
+
+               awsHelper = new AWSHelper();
        }
 
        @Override
@@ -56,7 +63,29 @@ public class AWSStatisticsReader implements 
LoadBalancerStatisticsReader {
 
        @Override
        public int getInFlightRequestCount(String clusterId) {
-               // Find out logic
-               return 0;
+
+               int inFlightRequestCount = 0;
+
+               ConcurrentHashMap<String, LoadBalancerInfo> 
clusterIdToLoadBalancerMap = AWSLoadBalancer
+                               .getClusterIdToLoadBalancerMap();
+
+               if (clusterIdToLoadBalancerMap.containsKey(clusterId)) {
+                       LoadBalancerInfo loadBalancerInfo = 
clusterIdToLoadBalancerMap
+                                       .get(clusterId);
+
+                       String loadBalancerName = loadBalancerInfo.getName();
+                       String region = loadBalancerInfo.getRegion();
+
+                       inFlightRequestCount = 
awsHelper.getRequestCount(loadBalancerName,
+                                       region, 
awsHelper.getStatisticsInterval())
+                                       - 
awsHelper.getAllResponsesCount(loadBalancerName, region,
+                                                       
awsHelper.getStatisticsInterval());
+
+                       if (inFlightRequestCount < 0)
+                               inFlightRequestCount = 0;
+
+               }
+
+               return inFlightRequestCount;
        }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/b641e85f/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/Constants.java
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/Constants.java
 
b/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/Constants.java
index 0792e00..30ada5c 100644
--- 
a/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/Constants.java
+++ 
b/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/Constants.java
@@ -23,23 +23,34 @@ package org.apache.stratos.aws.extension;
  * AWS proxy extension constants.
  */
 public class Constants {
-    public static final String CEP_STATS_PUBLISHER_ENABLED = 
"cep.stats.publisher.enabled";
-    public static final String THRIFT_RECEIVER_IP = "thrift.receiver.ip";
-    public static final String THRIFT_RECEIVER_PORT = "thrift.receiver.port";
-    public static final String NETWORK_PARTITION_ID = "network.partition.id";
-    public static final String CLUSTER_ID = "cluster.id";
-    public static final String SERVICE_NAME = "service.name";
-    public static final String AWS_PROPERTIES_FILE="aws.properties.file";
-    public static final String AWS_ACCESS_KEY = "access-key";
-    public static final String AWS_SECRET_KEY = "secret-key";
-    public static final String LB_PREFIX = "load-balancer-prefix";
-    public static final String LOAD_BALANCER_SECURITY_GROUP_NAME = 
"load-balancer-security-group-name";
-    public static final String LOAD_BALANCER_SECURITY_GROUP_DESCRIPTION = 
"Security group for load balancers created for Apache Stratos.";
-    public static final String ELB_ENDPOINT_URL_FORMAT = 
"elasticloadbalancing.%s.amazonaws.com";
-    public static final String EC2_ENDPOINT_URL_FORMAT = 
"ec2.%s.amazonaws.com";
-    public static final String ALLOWED_CIDR_IP_KEY = "allowed-cidr-ip";
-    public static final String ALLOWED_PROTOCOLS = "allowed-protocols";
-    public static final int LOAD_BALANCER_NAME_MAX_LENGTH = 32;
-    public static final int LOAD_BALANCER_PREFIX_MAX_LENGTH = 25;
-    public static final int SECURITY_GROUP_NAME_MAX_LENGTH = 255;
+       public static final String CEP_STATS_PUBLISHER_ENABLED = 
"cep.stats.publisher.enabled";
+       public static final String THRIFT_RECEIVER_IP = "thrift.receiver.ip";
+       public static final String THRIFT_RECEIVER_PORT = 
"thrift.receiver.port";
+       public static final String NETWORK_PARTITION_ID = 
"network.partition.id";
+       public static final String CLUSTER_ID = "cluster.id";
+       public static final String SERVICE_NAME = "service.name";
+       public static final String AWS_PROPERTIES_FILE = "aws.properties.file";
+       public static final String AWS_ACCESS_KEY = "access-key";
+       public static final String AWS_SECRET_KEY = "secret-key";
+       public static final String LB_PREFIX = "load-balancer-prefix";
+       public static final String LOAD_BALANCER_SECURITY_GROUP_NAME = 
"load-balancer-security-group-name";
+       public static final String LOAD_BALANCER_SECURITY_GROUP_DESCRIPTION = 
"Security group for load balancers created for Apache Stratos.";
+       public static final String ELB_ENDPOINT_URL_FORMAT = 
"elasticloadbalancing.%s.amazonaws.com";
+       public static final String EC2_ENDPOINT_URL_FORMAT = 
"ec2.%s.amazonaws.com";
+       public static final String CLOUD_WATCH_ENDPOINT_URL_FORMAT = 
"monitoring.%s.amazonaws.com";
+       public static final String ALLOWED_CIDR_IP_KEY = "allowed-cidr-ip";
+       public static final String ALLOWED_PROTOCOLS = "allowed-protocols";
+       public static final int LOAD_BALANCER_NAME_MAX_LENGTH = 32;
+       public static final int LOAD_BALANCER_PREFIX_MAX_LENGTH = 25;
+       public static final int SECURITY_GROUP_NAME_MAX_LENGTH = 255;
+       public static final String REQUEST_COUNT_METRIC_NAME = "RequestCount";
+       public static final String CLOUD_WATCH_NAMESPACE_NAME = "AWS/ELB";
+       public static final String SUM_STATISTICS_NAME = "Sum";
+       public static final String LOAD_BALANCER_DIMENTION_NAME = 
"LoadBalancerName";
+       public static final String HTTP_RESPONSE_2XX = "HTTPCode_Backend_2XX";
+       public static final String HTTP_RESPONSE_3XX = "HTTPCode_Backend_3XX";
+       public static final String HTTP_RESPONSE_4XX = "HTTPCode_Backend_4XX";
+       public static final String HTTP_RESPONSE_5XX = "HTTPCode_Backend_5XX";
+       public static final String STATISTICS_INTERVAL = "statistics-interval";
+       public static final int STATISTICS_INTERVAL_MULTIPLE_OF = 60;
 }

Reply via email to