Repository: incubator-stratos
Updated Branches:
  refs/heads/master 48e5c0c54 -> 22662cc1f


Fixed in-flight request count management logic on fault requests, added 
endpoint-timeout configuration parameter to loadbalancer.conf


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/9d088ef1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/9d088ef1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/9d088ef1

Branch: refs/heads/master
Commit: 9d088ef16b94455359b82e5ff286d725ebb65cc3
Parents: 24cc743
Author: Imesh Gunaratne <[email protected]>
Authored: Sun Apr 13 21:06:37 2014 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Sun Apr 13 21:06:37 2014 +0530

----------------------------------------------------------------------
 .../LoadBalancerStatisticsReader.java           |   2 +-
 .../LoadBalancerStatisticsNotifier.java         |   2 +-
 .../conf/LoadBalancerConfiguration.java         |  26 ++++-
 .../load/balancer/conf/util/Constants.java      |   2 +
 .../TenantAwareLoadBalanceEndpoint.java         | 107 ++++++++++++-------
 .../LoadBalancerStatisticsCollector.java        |  70 ++++++------
 .../LoadBalancerStatisticsCollectorTest.java    |  87 +++++++++++++++
 .../sample/configuration/loadbalancer1.conf     |   3 +
 .../sample/configuration/loadbalancer2.conf     |   3 +
 .../sample/configuration/loadbalancer3.conf     |   3 +
 .../extension/HAProxyStatisticsReader.java      |   2 +-
 .../src/main/conf/loadbalancer.conf             |   3 +
 12 files changed, 230 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
index 0e6e265..4a83aee 100644
--- 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
+++ 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
@@ -28,5 +28,5 @@ public interface LoadBalancerStatisticsReader {
      * Get in-flight request count of a sliding window configured e.g. 
Requests in flight of last minute.
      * @param clusterId
      */
-    int getInFlightRequestCountOfSlidingWindow(String clusterId);
+    int getInFlightRequestCount(String clusterId);
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
index 2e94108..4fe2504 100644
--- 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
+++ 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
@@ -78,7 +78,7 @@ public class LoadBalancerStatisticsNotifier implements 
Runnable {
                             for (Cluster cluster : service.getClusters()) {
                                 if (!cluster.isLbCluster()) {
                                     // Publish in-flight request count of load 
balancer's network partition
-                                    requestCount = 
statsReader.getInFlightRequestCountOfSlidingWindow(cluster.getClusterId());
+                                    requestCount = 
statsReader.getInFlightRequestCount(cluster.getClusterId());
                                     
inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId, 
requestCount);
                                     if (log.isDebugEnabled()) {
                                         log.debug(String.format("In-flight 
request count published to cep: [cluster-id] %s [network-partition] %s [value] 
%d",

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
index 1acf574..5f52411 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
@@ -51,6 +51,7 @@ public class LoadBalancerConfiguration {
     private String defaultAlgorithmName;
     private boolean failOverEnabled;
     private boolean sessionAffinityEnabled;
+    private long endpointTimeout;
     private long sessionTimeout;
     private boolean cepStatsPublisherEnabled;
     private String mbIp;
@@ -78,7 +79,7 @@ public class LoadBalancerConfiguration {
     /**
      * Get load balancer configuration singleton instance.
      *
-     * @return
+     * @return Load balancer configuration
      */
     public static LoadBalancerConfiguration getInstance() {
         if (instance == null) {
@@ -130,6 +131,14 @@ public class LoadBalancerConfiguration {
         this.sessionAffinityEnabled = sessionAffinityEnabled;
     }
 
+    public long getEndpointTimeout() {
+        return endpointTimeout;
+    }
+
+    public void setEndpointTimeout(long endpointTimeout) {
+        this.endpointTimeout = endpointTimeout;
+    }
+
     public long getSessionTimeout() {
         return sessionTimeout;
     }
@@ -311,12 +320,27 @@ public class LoadBalancerConfiguration {
             if (StringUtils.isNotBlank(sessionAffinity)) {
                 
configuration.setSessionAffinityEnabled(Boolean.parseBoolean(sessionAffinity));
             }
+
+            String endpointTimeout = 
loadBalancerNode.getProperty(Constants.CONF_PROPERTY_ENDPOINT_TIMEOUT);
+            if (StringUtils.isNotBlank(endpointTimeout)) {
+                
configuration.setEndpointTimeout(Long.parseLong(endpointTimeout));
+            } else {
+                // Endpoint timeout is not found, set default value
+                
configuration.setEndpointTimeout(Constants.DEFAULT_ENDPOINT_TIMEOUT);
+                if(log.isWarnEnabled()) {
+                    log.warn(String.format("Endpoint timeout not found, using 
default: %d", configuration.getEndpointTimeout()));
+                }
+            }
+
             String sessionTimeout = 
loadBalancerNode.getProperty(Constants.CONF_PROPERTY_SESSION_TIMEOUT);
             if (StringUtils.isNotBlank(sessionTimeout)) {
                 
configuration.setSessionTimeout(Long.parseLong(sessionTimeout));
             } else {
                 // Session timeout is not found, set default value
                 
configuration.setSessionTimeout(Constants.DEFAULT_SESSION_TIMEOUT);
+                if(log.isWarnEnabled()) {
+                    log.warn(String.format("Session timeout not found, using 
default: %d", configuration.getSessionTimeout()));
+                }
             }
 
             String topologyEventListenerEnabled = 
loadBalancerNode.getProperty(Constants.CONF_PROPERTY_TOPOLOGY_EVENT_LISTENER);

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java
index ce6b8e7..72851af 100755
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java
@@ -35,6 +35,7 @@ public class Constants {
     public static final String CONF_PROPERTY_ALGORITHM = "algorithm";
     public static final String CONF_PROPERTY_FAILOVER = "failover";
     public static final String CONF_PROPERTY_SESSION_AFFINITY = 
"session-affinity";
+    public static final String CONF_PROPERTY_ENDPOINT_TIMEOUT = 
"endpoint-timeout";
     public static final String CONF_PROPERTY_SESSION_TIMEOUT = 
"session-timeout";
     public static final String CONF_PROPERTY_TOPOLOGY_EVENT_LISTENER = 
"topology-event-listener";
     public static final String CONF_PROPERTY_TOPOLOGY_MEMBER_IP_TYPE = 
"topology-member-ip-type";
@@ -60,6 +61,7 @@ public class Constants {
     public static final String CONF_PROPERTY_NETWORK_PARTITION_ID = 
"network-partition-id";
 
     public static final String CONF_DELIMITER_HOSTS = ",";
+    public static final long DEFAULT_ENDPOINT_TIMEOUT = 15000;
     public static final long DEFAULT_SESSION_TIMEOUT = 90000;
     public static final String STATIC_NETWORK_PARTITION = 
"static-network-partition";
     public static final String STATIC_PARTITION = "static-partition";

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
index 048b26c..3d71a0a 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
@@ -28,6 +28,7 @@ import 
org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory;
 import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration;
 import org.apache.stratos.load.balancer.conf.domain.MemberIpType;
 import org.apache.stratos.load.balancer.conf.domain.TenantIdentifier;
+import 
org.apache.stratos.load.balancer.statistics.InFlightRequestDecrementCallable;
 import 
org.apache.stratos.load.balancer.statistics.InFlightRequestIncrementCallable;
 import 
org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsExecutor;
 import org.apache.stratos.load.balancer.util.Constants;
@@ -397,10 +398,13 @@ public class TenantAwareLoadBalanceEndpoint extends 
org.apache.synapse.endpoints
         endpoint.setEnableMBeanStats(false);
         endpoint.setName("DLB:" + member.getHostName() +
                 ":" + member.getPort() + ":" + UUID.randomUUID());
+
         EndpointDefinition definition = new EndpointDefinition();
-        definition.setSuspendMaximumDuration(10000);
+        definition.setTimeoutAction(SynapseConstants.DISCARD_AND_FAULT);
+        
definition.setTimeoutDuration(LoadBalancerConfiguration.getInstance().getEndpointTimeout());
         definition.setReplicationDisabled(true);
         definition.setAddress(to.getAddress());
+
         endpoint.setDefinition(definition);
         endpoint.init((SynapseEnvironment)
                 ((Axis2MessageContext) synCtx).getAxis2MessageContext().
@@ -510,13 +514,12 @@ public class TenantAwareLoadBalanceEndpoint extends 
org.apache.synapse.endpoints
 
         Endpoint endpoint = getEndpoint(to, currentMember, synCtx);
 
-        if (isFailover()) {
-            faultHandler.setTo(to);
-            faultHandler.setCurrentMember(currentMember);
-            faultHandler.setCurrentEp(endpoint);
-            synCtx.pushFaultHandler(faultHandler);
-            synCtx.getEnvelope().build();
-        }
+        // Push fault handler to manage statistics and fail-over logic
+        faultHandler.setTo(to);
+        faultHandler.setCurrentMember(currentMember);
+        faultHandler.setCurrentEp(endpoint);
+        synCtx.pushFaultHandler(faultHandler);
+        synCtx.getEnvelope().build();
 
         if (isSessionAffinityBasedLB()) {
             
synCtx.setProperty(SynapseConstants.PROP_SAL_ENDPOINT_DEFAULT_SESSION_TIMEOUT, 
getSessionTimeout());
@@ -575,6 +578,22 @@ public class TenantAwareLoadBalanceEndpoint extends 
org.apache.synapse.endpoints
         }
     }
 
+    private void decrementInFlightRequestCount(MessageContext messageContext) {
+        try {
+            String clusterId = (String) 
messageContext.getProperty(Constants.CLUSTER_ID);
+            if(StringUtils.isBlank(clusterId)) {
+                throw new RuntimeException("Cluster id not found in message 
context");
+            }
+            FutureTask<Object> task = new FutureTask<Object>(new 
InFlightRequestDecrementCallable(clusterId));
+            
LoadBalancerStatisticsExecutor.getInstance().getService().submit(task);
+        }
+        catch (Exception e) {
+            if(log.isDebugEnabled()) {
+                log.debug("Could not decrement in-flight request count", e);
+            }
+        }
+    }
+
     public void setDispatcher(HttpSessionDispatcher dispatcher) {
         this.dispatcher = dispatcher;
     }
@@ -634,41 +653,57 @@ public class TenantAwareLoadBalanceEndpoint extends 
org.apache.synapse.endpoints
 
         @Override
         public void onFault(MessageContext synCtx) {
-            // Cleanup endpoint if exists
-            if (currentEp != null) {
-                currentEp.destroy();
-            }
-            if (currentMember == null) {
-                return;
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("A fault detected in message sent to: 
%s ", (to != null) ? to.getAddress() : "address not found"));
             }
 
-            // Add current member to faulty members
-            faultyMembers.put(currentMember.getHostName(), true);
+            // Decrement in-flight request count
+            decrementInFlightRequestCount(synCtx);
 
-            currentMember = findNextMember(synCtx);
-            if (currentMember == null) {
-                String msg = String.format("No application members available 
to serve the request %s", synCtx.getTo().getAddress());
-                if (log.isErrorEnabled()) {
-                    log.error(msg);
+            if (isFailover()) {
+                if(log.isDebugEnabled()) {
+                    log.debug("Fail-over enabled, trying to send the message 
to the next available member");
+                }
+
+                // Cleanup endpoint if exists
+                if (currentEp != null) {
+                    currentEp.destroy();
+                }
+                if (currentMember == null) {
+                    if(log.isErrorEnabled()) {
+                        log.error("Current member is null, could not 
fail-over");
+                    }
+                    return;
                 }
-                throwSynapseException(synCtx, 404, msg);
-            }
-            if (faultyMembers.containsKey(currentMember.getHostName())) {
-                // This member has been identified as faulty previously. It 
implies that
-                // this request could not be served by any of the members in 
the cluster.
-                throwSynapseException(synCtx, 404, String.format("Requested 
resource could not be found"));
-            }
 
-            synCtx.setTo(to);
-            if (isSessionAffinityBasedLB()) {
-                //We are sending the this message on a new session,
-                // hence we need to remove previous session information
-                Set pros = synCtx.getPropertyKeySet();
-                if (pros != null) {
-                    
pros.remove(SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION);
+                // Add current member to faulty members
+                faultyMembers.put(currentMember.getHostName(), true);
+
+                currentMember = findNextMember(synCtx);
+                if (currentMember == null) {
+                    String msg = String.format("No members available to serve 
the request %s", (to != null) ? to.getAddress() : "address not found");
+                    if (log.isErrorEnabled()) {
+                        log.error(msg);
+                    }
+                    throwSynapseException(synCtx, 404, msg);
+                }
+                if (faultyMembers.containsKey(currentMember.getHostName())) {
+                    // This member has been identified as faulty previously. 
It implies that
+                    // this request could not be served by any of the members 
in the cluster.
+                    throwSynapseException(synCtx, 404, 
String.format("Requested resource could not be found"));
+                }
+
+                synCtx.setTo(to);
+                if (isSessionAffinityBasedLB()) {
+                    //We are sending the this message on a new session,
+                    // hence we need to remove previous session information
+                    Set pros = synCtx.getPropertyKeySet();
+                    if (pros != null) {
+                        
pros.remove(SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION);
+                    }
                 }
+                sendToApplicationMember(synCtx, currentMember, this, true);
             }
-            sendToApplicationMember(synCtx, currentMember, this, true);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
index b49fcbf..3557d3a 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
@@ -33,11 +33,11 @@ public class LoadBalancerStatisticsCollector implements 
LoadBalancerStatisticsRe
     private static final Log log = 
LogFactory.getLog(LoadBalancerStatisticsCollector.class);
 
     private static volatile LoadBalancerStatisticsCollector instance;
-    // Map<ClusterId, ArrayList<Date>>
-    private Map<String, List<Date>> inFlightRequestToDateListMap;
+    // Map<ClusterId, Integer>
+    private Map<String, Integer> clusterIdRequestCountMap;
 
     private LoadBalancerStatisticsCollector() {
-        inFlightRequestToDateListMap = new ConcurrentHashMap<String, 
List<Date>>();
+        clusterIdRequestCountMap = new ConcurrentHashMap<String, Integer>();
     }
 
     public static LoadBalancerStatisticsCollector getInstance() {
@@ -54,28 +54,22 @@ public class LoadBalancerStatisticsCollector implements 
LoadBalancerStatisticsRe
         return instance;
     }
 
-    public int getInFlightRequestCountOfSlidingWindow(String clusterId) {
+    /**
+     * Clear load balancer statistics collector singleton instance.
+     */
+    public static void clear() {
         synchronized (LoadBalancerStatisticsCollector.class) {
-            // Sliding window in milliseconds
-            int slidingWindow = 10000; // TODO Move this to loadbalancer.conf
+            instance = null;
+        }
+    }
 
-            if (inFlightRequestToDateListMap.containsKey(clusterId)) {
-                List<Date> dateList = 
inFlightRequestToDateListMap.get(clusterId);
-                List<Date> updatedList = Collections.synchronizedList(new 
ArrayList<Date>());
-                Date currentDate = new Date();
-                long slidingWindStart = currentDate.getTime() - slidingWindow;
-                int count = 0;
-                for (Date date : dateList) {
-                    if (date.getTime() > slidingWindStart) {
-                        count++;
-                    }
-                    else {
-                        updatedList.add(date);
-                    }
+    public int getInFlightRequestCount(String clusterId) {
+        synchronized (LoadBalancerStatisticsCollector.class) {
+            if (clusterIdRequestCountMap.containsKey(clusterId)) {
+                Integer count = clusterIdRequestCountMap.get(clusterId);
+                if(count != null) {
+                    return count;
                 }
-                // Remove dates counted
-                inFlightRequestToDateListMap.put(clusterId, updatedList);
-                return count;
             }
             return 0;
         }
@@ -89,18 +83,16 @@ public class LoadBalancerStatisticsCollector implements 
LoadBalancerStatisticsRe
                 }
                 return;
             }
-            List<Date> dateList;
-            if (inFlightRequestToDateListMap.containsKey(clusterId)) {
-                dateList = inFlightRequestToDateListMap.get(clusterId);
-            } else {
-                dateList = Collections.synchronizedList(new ArrayList<Date>());
-                inFlightRequestToDateListMap.put(clusterId, dateList);
+            Integer count = 0;
+            if (clusterIdRequestCountMap.containsKey(clusterId)) {
+                count = clusterIdRequestCountMap.get(clusterId);
             }
-            // Add current date to cluster date list
-            dateList.add(new Date());
+            count++;
+            clusterIdRequestCountMap.put(clusterId, count);
+
             if (log.isDebugEnabled()) {
                 log.debug(String.format("In-flight request count incremented: 
[cluster] %s [count] %s ", clusterId,
-                        dateList.size()));
+                        count));
 
             }
         }
@@ -115,22 +107,20 @@ public class LoadBalancerStatisticsCollector implements 
LoadBalancerStatisticsRe
                 return;
             }
 
-            if (!inFlightRequestToDateListMap.containsKey(clusterId)) {
+            if (!clusterIdRequestCountMap.containsKey(clusterId)) {
                 if (log.isDebugEnabled()) {
-                    log.debug(String.format("In-flight request date list not 
found for cluster: [cluster] %s ", clusterId));
+                    log.debug(String.format("In-flight request count not found 
for cluster, could not decrement in-flight request count: [cluster] %s ", 
clusterId));
                 }
             } else {
-                List<Date> dateList = 
inFlightRequestToDateListMap.get(clusterId);
-                if (!dateList.isEmpty()) {
-                    int index = dateList.size() - 1;
-                    if (index >= 0) {
-                        dateList.remove(index);
-                    }
+                Integer count = clusterIdRequestCountMap.get(clusterId);
+                if (count != null) {
+                    count = (count >= 1) ? (count - 1) : 0;
                 }
+                clusterIdRequestCountMap.put(clusterId, count);
 
                 if (log.isDebugEnabled()) {
                     log.debug(String.format("In-flight request count 
decremented: [cluster] %s [count] %s ", clusterId,
-                            dateList.size()));
+                            count));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerStatisticsCollectorTest.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerStatisticsCollectorTest.java
 
b/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerStatisticsCollectorTest.java
new file mode 100644
index 0000000..26b5f34
--- /dev/null
+++ 
b/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerStatisticsCollectorTest.java
@@ -0,0 +1,87 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+
+ *  http://www.apache.org/licenses/LICENSE-2.0
+
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.stratos.load.balancer.test;
+
+import 
org.apache.stratos.load.balancer.statistics.InFlightRequestIncrementCallable;
+import 
org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsCollector;
+import org.junit.Assert;
+import 
org.apache.stratos.load.balancer.statistics.InFlightRequestDecrementCallable;
+import 
org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsExecutor;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
+/**
+ * Load balancer statistics collector tests.
+ */
+@RunWith(JUnit4.class)
+public class LoadBalancerStatisticsCollectorTest {
+
+    /***
+     * Test in-flight request count calculation.
+     */
+    @Test
+    public void testInFlightRequestCountCalculation() {
+        String clusterId = "cluster1";
+        String incrementErrorMessage = "Could not increment in-flight request 
count: ";
+        String decrementErrorMessage = "Could not decrement in-flight request 
count: ";
+
+        FutureTask<Object> task = new FutureTask<Object>(new 
InFlightRequestIncrementCallable(clusterId));
+        executeTask(task);
+        Assert.assertEquals(incrementErrorMessage, 1, 
LoadBalancerStatisticsCollector.getInstance().getInFlightRequestCount(clusterId));
+
+        task = new FutureTask<Object>(new 
InFlightRequestIncrementCallable(clusterId));
+        executeTask(task);
+        Assert.assertEquals(incrementErrorMessage, 2, 
LoadBalancerStatisticsCollector.getInstance().getInFlightRequestCount(clusterId));
+
+        task = new FutureTask<Object>(new 
InFlightRequestIncrementCallable(clusterId));
+        executeTask(task);
+        Assert.assertEquals(incrementErrorMessage, 3, 
LoadBalancerStatisticsCollector.getInstance().getInFlightRequestCount(clusterId));
+
+        task = new FutureTask<Object>(new 
InFlightRequestDecrementCallable(clusterId));
+        executeTask(task);
+        Assert.assertEquals(decrementErrorMessage, 2, 
LoadBalancerStatisticsCollector.getInstance().getInFlightRequestCount(clusterId));
+
+        task = new FutureTask<Object>(new 
InFlightRequestDecrementCallable(clusterId));
+        executeTask(task);
+        Assert.assertEquals(decrementErrorMessage, 1, 
LoadBalancerStatisticsCollector.getInstance().getInFlightRequestCount(clusterId));
+
+        task = new FutureTask<Object>(new 
InFlightRequestDecrementCallable(clusterId));
+        executeTask(task);
+        Assert.assertEquals(decrementErrorMessage, 0, 
LoadBalancerStatisticsCollector.getInstance().getInFlightRequestCount(clusterId));
+
+        LoadBalancerStatisticsCollector.clear();
+    }
+
+    private void executeTask(FutureTask<Object> task) {
+        Future future = 
LoadBalancerStatisticsExecutor.getInstance().getService().submit(task);
+        while(!future.isDone()) {
+            // Wait until task get executed
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                // Might not need to trace
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf
 
b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf
index 0d315de..ff11dac 100755
--- 
a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf
+++ 
b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf
@@ -37,6 +37,9 @@ loadbalancer {
     # incoming requests to members with same sessions.
     session-affinity: true;
 
+    # Endpoint timeout in milli-seconds
+    endpoint-timeout: 15000;
+
     # Session timeout in milli-seconds
     session-timeout: 90000;
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
 
b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
index 37325d1..9910dc9 100755
--- 
a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
+++ 
b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
@@ -37,6 +37,9 @@ loadbalancer {
     # incoming requests to members with same sessions.
     session-affinity: true;
 
+    # Endpoint timeout in milli-seconds
+    endpoint-timeout: 15000;
+
     # Session timeout in milli-seconds
     session-timeout: 90000;
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf
 
b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf
index b5d915f..a629a6f 100755
--- 
a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf
+++ 
b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf
@@ -37,6 +37,9 @@ loadbalancer {
     # incoming requests to members with same sessions.
     session-affinity: true;
 
+    # Endpoint timeout in milli-seconds
+    endpoint-timeout: 15000;
+
     # Session timeout in milli-seconds
     session-timeout: 90000;
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
index 4c35537..b38aa3c 100644
--- 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
+++ 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
@@ -46,7 +46,7 @@ public class HAProxyStatisticsReader implements 
LoadBalancerStatisticsReader {
     }
 
     @Override
-    public int getInFlightRequestCountOfSlidingWindow(String clusterId) {
+    public int getInFlightRequestCount(String clusterId) {
         String frontendId, backendId, command, output;
         String[] array;
         int totalWeight, weight;

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
----------------------------------------------------------------------
diff --git 
a/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf 
b/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
index 7b0d511..7f61b8a 100644
--- 
a/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
+++ 
b/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
@@ -31,6 +31,9 @@ loadbalancer {
     # incoming requests to members with same sessions.
     session-affinity: true;
 
+    # Endpoint timeout in milli-seconds
+    endpoint-timeout: 15000;
+
     # Session timeout in milli-seconds
     session-timeout: 90000;
 

Reply via email to