Updated Branches:
  refs/heads/master 442be445f -> 7d8a8b3cd

Fixing requests in flight decrement issue


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/7d8a8b3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/7d8a8b3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/7d8a8b3c

Branch: refs/heads/master
Commit: 7d8a8b3cdfb112affc230c6716d71a24e0a35c0a
Parents: 442be44
Author: Lahiru Sandaruwan <[email protected]>
Authored: Wed Jan 15 00:16:11 2014 +0530
Committer: Lahiru Sandaruwan <[email protected]>
Committed: Wed Jan 15 00:16:11 2014 +0530

----------------------------------------------------------------------
 .../LoadBalancerStatisticsReader.java           |   6 +-
 .../LoadBalancerStatisticsNotifier.java         |   2 +-
 .../TenantAwareLoadBalanceEndpoint.java         |   2 +-
 .../balancer/mediators/ResponseInterceptor.java |   2 +-
 .../LoadBalancerStatisticsCollector.java        | 112 +++++++++++++------
 .../extension/HAProxyStatisticsReader.java      |   2 +-
 6 files changed, 86 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7d8a8b3c/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
index 41e81e8..0e6e265 100644
--- 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
+++ 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
@@ -19,16 +19,14 @@
 
 package org.apache.stratos.load.balancer.common.statistics;
 
-import java.util.HashMap;
-
 /**
  * Load balancer statistics reader interface.
  */
 public interface LoadBalancerStatisticsReader {
 
     /**
-     * Get in-flight request count of a given cluster.
+     * Get in-flight request count of a sliding window configured e.g. 
Requests in flight of last minute.
      * @param clusterId
      */
-    int getInFlightRequestCount(String clusterId);
+    int getInFlightRequestCountOfSlidingWindow(String clusterId);
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7d8a8b3c/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
index 7268695..ebef3f8 100644
--- 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
+++ 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
@@ -78,7 +78,7 @@ public class LoadBalancerStatisticsNotifier implements 
Runnable {
                             for (Cluster cluster : service.getClusters()) {
                                 if (!cluster.isLbCluster()) {
                                     // Publish in-flight request count of load 
balancer's network partition
-                                    requestCount = 
statsReader.getInFlightRequestCount(cluster.getClusterId());
+                                    requestCount = 
statsReader.getInFlightRequestCountOfSlidingWindow(cluster.getClusterId());
                                     
inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId, 
requestCount);
                                     if (log.isDebugEnabled()) {
                                         log.debug(String.format("In-flight 
request count published to cep: [cluster-id] %s [network-partition] %s [value] 
%d",

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7d8a8b3c/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
index efc88c9..05417b3 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
@@ -526,7 +526,7 @@ public class TenantAwareLoadBalanceEndpoint extends 
org.apache.synapse.endpoints
             if(StringUtils.isBlank(clusterId)) {
                 throw new RuntimeException("Cluster id not found in message 
context");
             }
-            
LoadBalancerStatisticsCollector.getInstance().incrementInFlightRequestCount(clusterId);
+            
LoadBalancerStatisticsCollector.getInstance().addAnInFlightRequest(clusterId);
         }
         catch (Exception e) {
             if(log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7d8a8b3c/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
index cf3e768..e350840 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
@@ -40,7 +40,7 @@ public class ResponseInterceptor extends AbstractMediator 
implements ManagedLife
             if (StringUtils.isBlank(clusterId)) {
                 throw new RuntimeException("Cluster id not found in message 
context");
             }
-            
LoadBalancerStatisticsCollector.getInstance().decrementInFlightRequestCount(clusterId);
+            
LoadBalancerStatisticsCollector.getInstance().removeAnInFlightRequest(clusterId);
         } catch (Exception e) {
             if(log.isErrorEnabled()) {
                 log.error("Could not decrement in-flight request count", e);

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7d8a8b3c/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
index 72186fe..cff081b 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
@@ -1,18 +1,18 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one 
+ * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *  http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
@@ -23,7 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import 
org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
 
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -34,10 +34,10 @@ public class LoadBalancerStatisticsCollector implements 
LoadBalancerStatisticsRe
 
     private static volatile LoadBalancerStatisticsCollector instance;
     // Map<ClusterId, Map<PartitionId, InFlightRequestCount>
-    private Map<String, Integer> inFlightRequestCountMap;
+    private Map<String, Vector<Date>> inFlightRequestToDateListMap;
 
     private LoadBalancerStatisticsCollector() {
-        inFlightRequestCountMap = new ConcurrentHashMap<String, Integer>();
+        inFlightRequestToDateListMap = new ConcurrentHashMap<String, 
Vector<Date>>();
     }
 
     public static LoadBalancerStatisticsCollector getInstance() {
@@ -54,48 +54,96 @@ public class LoadBalancerStatisticsCollector implements 
LoadBalancerStatisticsRe
         return instance;
     }
 
-    public int getInFlightRequestCount(String clusterId) {
-        if (inFlightRequestCountMap.containsKey(clusterId)) {
-            return inFlightRequestCountMap.get(clusterId);
+    public int getInFlightRequestCountOfSlidingWindow(String clusterId) {
+        //Clear the list before returning...
+
+        //Sliding window in seconds
+        int slidingWindow = 60;//TODO get this from a config
+
+        if (inFlightRequestToDateListMap.containsKey(clusterId)) {
+            Vector<Date> vector = inFlightRequestToDateListMap.get(clusterId);
+            Iterator<Date> itr = vector.iterator();
+            while(itr.hasNext()){
+                Date date = itr.next();
+                Date currentDate = new Date();
+                if((currentDate.getTime() - date.getTime()) > slidingWindow){ 
// we will remove the
+                    itr.remove();
+                } else {
+                    //If the
+                    break;
+                }
+            }
+            return inFlightRequestToDateListMap.get(clusterId).size();
         }
         return 0;
     }
 
-    public void setInFlightRequestCount(String clusterId, int value) {
+    public void addAnInFlightRequest(String clusterId) {
         if (StringUtils.isBlank(clusterId)) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster id is blank which try to set 
requests in flight" +
+                        " : [cluster] %s", clusterId));
+            }
             return;
         }
-
-        inFlightRequestCountMap.put(clusterId, value);
+        if (!inFlightRequestToDateListMap.containsKey(clusterId)) {
+            Vector<Date> list = inFlightRequestToDateListMap.get(clusterId);
+            list.add(new Date());
+            inFlightRequestToDateListMap.put(clusterId, list);
+        } else {
+            inFlightRequestToDateListMap.get(clusterId).add(new Date());
+        }
         if (log.isDebugEnabled()) {
-            log.debug(String.format("In-flight request count updated: 
[cluster] %s [value] %d", clusterId, value));
+            log.debug(String.format("In-flight request count updated: 
[cluster] %s ", clusterId));
         }
     }
 
-    public void incrementInFlightRequestCount(String clusterId) {
-        incrementInFlightRequestCount(clusterId, 1);
-    }
 
-    public void incrementInFlightRequestCount(String clusterId, int value) {
+    public void removeAnInFlightRequest(String clusterId) {
         if (StringUtils.isBlank(clusterId)) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster id is blank which try to 
remove a requests in flight" +
+                        " : [cluster] %s ", clusterId));
+            }
             return;
         }
+        if (!inFlightRequestToDateListMap.containsKey(clusterId)) {
 
-        int count = getInFlightRequestCount(clusterId);
-        setInFlightRequestCount(clusterId, (count + value));
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("In-flight list not available for 
cluster : [cluster] %s ", clusterId));
+            }
+        } else {
+            inFlightRequestToDateListMap.remove(clusterId);
+        }
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("In-flight request count updated: 
[cluster] %s ", clusterId));
+        }
     }
 
-    public void decrementInFlightRequestCount(String clusterId) {
-        decrementInFlightRequestCount(clusterId, 1);
-    }
+//    public void incrementInFlightRequestCount(String clusterId) {
+//        incrementInFlightRequestCount(clusterId, 1);
+//    }
 
-    public void decrementInFlightRequestCount(String clusterId, int value) {
-        if (StringUtils.isBlank(clusterId)) {
-            return;
-        }
+//    private void incrementInFlightRequestCount(String clusterId, int value) {
+//        if (StringUtils.isBlank(clusterId)) {
+//            return;
+//        }
+//
+//        int count = getInFlightRequestCount(clusterId);
+//        addAnInFlightRequest(clusterId, (count + value));
+//    }
 
-        int count = getInFlightRequestCount(clusterId);
-        int newValue = (count - value) < 0 ? 0 : (count - value);
-        setInFlightRequestCount(clusterId, newValue);
-    }
-}
+//    public void decrementInFlightRequestCount(String clusterId) {
+//        decrementInFlightRequestCount(clusterId, 1);
+//    }
+
+//    private void decrementInFlightRequestCount(String clusterId, int value) {
+//        if (StringUtils.isBlank(clusterId)) {
+//            return;
+//        }
+//
+//        int count = getInFlightRequestCount(clusterId);
+//        int newValue = (count - value) < 0 ? 0 : (count - value);
+//        addAnInFlightRequest(clusterId, newValue);
+//    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7d8a8b3c/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
index b38aa3c..4c35537 100644
--- 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
+++ 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
@@ -46,7 +46,7 @@ public class HAProxyStatisticsReader implements 
LoadBalancerStatisticsReader {
     }
 
     @Override
-    public int getInFlightRequestCount(String clusterId) {
+    public int getInFlightRequestCountOfSlidingWindow(String clusterId) {
         String frontendId, backendId, command, output;
         String[] array;
         int totalWeight, weight;

Reply via email to