Repository: incubator-stratos
Updated Branches:
  refs/heads/master ac003748a -> 7eff3e8d6


Implemented new functionality to execute statistics update calls using an 
executor service


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/b1ae4e25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/b1ae4e25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/b1ae4e25

Branch: refs/heads/master
Commit: b1ae4e2585f736ae7221da6bad1880c4d65c947e
Parents: 53f1fc8
Author: Imesh Gunaratne <[email protected]>
Authored: Thu Feb 27 02:01:31 2014 -0500
Committer: Imesh Gunaratne <[email protected]>
Committed: Thu Feb 27 02:01:31 2014 -0500

----------------------------------------------------------------------
 .../TenantAwareLoadBalanceEndpoint.java         |   7 +-
 .../balancer/mediators/ResponseInterceptor.java |  22 +--
 .../InFlightRequestDecrementCallable.java       |  38 +++++
 .../InFlightRequestIncrementCallable.java       |  38 +++++
 .../LoadBalancerStatisticsCollector.java        | 159 ++++++++-----------
 .../LoadBalancerStatisticsExecutor.java         |  56 +++++++
 6 files changed, 214 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/b1ae4e25/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
index aeeb3a1..b88848e 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
@@ -28,7 +28,8 @@ import 
org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory;
 import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration;
 import org.apache.stratos.load.balancer.conf.domain.MemberIpType;
 import org.apache.stratos.load.balancer.conf.domain.TenantIdentifier;
-import 
org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsCollector;
+import 
org.apache.stratos.load.balancer.statistics.InFlightRequestIncrementCallable;
+import 
org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsExecutor;
 import org.apache.stratos.load.balancer.util.Constants;
 import org.apache.stratos.messaging.domain.tenant.Tenant;
 import org.apache.stratos.messaging.domain.topology.Member;
@@ -51,6 +52,7 @@ import java.io.Serializable;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.*;
+import java.util.concurrent.FutureTask;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -559,7 +561,8 @@ public class TenantAwareLoadBalanceEndpoint extends 
org.apache.synapse.endpoints
             if(StringUtils.isBlank(clusterId)) {
                 throw new RuntimeException("Cluster id not found in message 
context");
             }
-            
LoadBalancerStatisticsCollector.getInstance().addAnInFlightRequest(clusterId);
+            FutureTask<Object> task = new FutureTask<Object>(new 
InFlightRequestIncrementCallable(clusterId));
+            
LoadBalancerStatisticsExecutor.getInstance().getService().submit(task);
         }
         catch (Exception e) {
             if(log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/b1ae4e25/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
index 7ca6c8d..3913248 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
@@ -19,18 +19,28 @@
 package org.apache.stratos.load.balancer.mediators;
 
 import org.apache.commons.lang3.StringUtils;
-import 
org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsCollector;
+import 
org.apache.stratos.load.balancer.statistics.InFlightRequestDecrementCallable;
+import 
org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsExecutor;
 import org.apache.stratos.load.balancer.util.Constants;
 import org.apache.synapse.ManagedLifecycle;
 import org.apache.synapse.MessageContext;
 import org.apache.synapse.core.SynapseEnvironment;
 import org.apache.synapse.mediators.AbstractMediator;
 
+import java.util.concurrent.FutureTask;
+
 /**
  * This Synapse mediator counts the responses that are going across LB.
  */
 public class ResponseInterceptor extends AbstractMediator implements 
ManagedLifecycle {
 
+    @Override
+    public void init(SynapseEnvironment arg) {
+        if (log.isDebugEnabled()) {
+            log.debug("ResponseInterceptor mediator initiated");
+        }
+    }
+
     public boolean mediate(MessageContext messageContext) {
         try {
             if (log.isDebugEnabled()) {
@@ -38,7 +48,8 @@ public class ResponseInterceptor extends AbstractMediator 
implements ManagedLife
             }
             String clusterId = (String) 
messageContext.getProperty(Constants.CLUSTER_ID);
             if (StringUtils.isNotBlank(clusterId)) {
-               
LoadBalancerStatisticsCollector.getInstance().removeAnInFlightRequest(clusterId);
+                FutureTask<Object> task = new FutureTask<Object>(new 
InFlightRequestDecrementCallable(clusterId));
+                
LoadBalancerStatisticsExecutor.getInstance().getService().submit(task);
             } else{
                if (log.isDebugEnabled()) {
                     log.debug("Could not decrement in-flight request count : 
cluster id not found in message context");
@@ -59,11 +70,4 @@ public class ResponseInterceptor extends AbstractMediator 
implements ManagedLife
             log.debug("Response interceptor mediator destroyed");
         }
     }
-
-    @Override
-    public void init(SynapseEnvironment arg0) {
-        if (log.isDebugEnabled()) {
-            log.debug("ResponseInterceptor mediator initiated");
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/b1ae4e25/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/InFlightRequestDecrementCallable.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/InFlightRequestDecrementCallable.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/InFlightRequestDecrementCallable.java
new file mode 100644
index 0000000..a880d32
--- /dev/null
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/InFlightRequestDecrementCallable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.load.balancer.statistics;
+
+import java.util.concurrent.Callable;
+
+/**
+ * In-flight request decrement callable definition.
+ */
+public class InFlightRequestDecrementCallable implements Callable {
+    private String clusterId;
+
+    public InFlightRequestDecrementCallable(String clusterId) {
+        this.clusterId = clusterId;
+    }
+
+    @Override
+    public Object call() throws Exception {
+        
LoadBalancerStatisticsCollector.getInstance().decrementInFlightRequestCount(clusterId);
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/b1ae4e25/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/InFlightRequestIncrementCallable.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/InFlightRequestIncrementCallable.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/InFlightRequestIncrementCallable.java
new file mode 100644
index 0000000..e670eab
--- /dev/null
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/InFlightRequestIncrementCallable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.load.balancer.statistics;
+
+import java.util.concurrent.Callable;
+
+/**
+ * In-flight request increment callable definition.
+ */
+public class InFlightRequestIncrementCallable implements Callable {
+    private String clusterId;
+
+    public InFlightRequestIncrementCallable(String clusterId) {
+        this.clusterId = clusterId;
+    }
+
+    @Override
+    public Object call() throws Exception {
+        
LoadBalancerStatisticsCollector.getInstance().incrementInFlightRequestCount(clusterId);
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/b1ae4e25/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
index f522a27..b49fcbf 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
@@ -33,18 +33,18 @@ public class LoadBalancerStatisticsCollector implements 
LoadBalancerStatisticsRe
     private static final Log log = 
LogFactory.getLog(LoadBalancerStatisticsCollector.class);
 
     private static volatile LoadBalancerStatisticsCollector instance;
-    // Map<ClusterId, Map<PartitionId, InFlightRequestCount>
-    private Map<String, Vector<Date>> inFlightRequestToDateListMap;
+    // Map<ClusterId, ArrayList<Date>>
+    private Map<String, List<Date>> inFlightRequestToDateListMap;
 
     private LoadBalancerStatisticsCollector() {
-        inFlightRequestToDateListMap = new ConcurrentHashMap<String, 
Vector<Date>>();
+        inFlightRequestToDateListMap = new ConcurrentHashMap<String, 
List<Date>>();
     }
 
     public static LoadBalancerStatisticsCollector getInstance() {
         if (instance == null) {
             synchronized (LoadBalancerStatisticsCollector.class) {
                 if (instance == null) {
-                    if(log.isDebugEnabled()) {
+                    if (log.isDebugEnabled()) {
                         log.debug("Load balancer in-flight request count 
collector instance created");
                     }
                     instance = new LoadBalancerStatisticsCollector();
@@ -55,115 +55,84 @@ public class LoadBalancerStatisticsCollector implements 
LoadBalancerStatisticsRe
     }
 
     public int getInFlightRequestCountOfSlidingWindow(String clusterId) {
-        //Clear the list before returning...
+        synchronized (LoadBalancerStatisticsCollector.class) {
+            // Sliding window in milliseconds
+            int slidingWindow = 10000; // TODO Move this to loadbalancer.conf
 
-        //Sliding window in Milliseconds
-        int slidingWindow = 60000;//TODO get this from a config
-
-
-        if (inFlightRequestToDateListMap.containsKey(clusterId)) {
-            Vector<Date> vector = inFlightRequestToDateListMap.get(clusterId);
-            Iterator<Date> itr = vector.iterator();
-            while(itr.hasNext()){
-                Date date = itr.next();
+            if (inFlightRequestToDateListMap.containsKey(clusterId)) {
+                List<Date> dateList = 
inFlightRequestToDateListMap.get(clusterId);
+                List<Date> updatedList = Collections.synchronizedList(new 
ArrayList<Date>());
                 Date currentDate = new Date();
-                if((currentDate.getTime() - date.getTime()) > slidingWindow){ 
// we will remove the
-                    itr.remove();
-                } else {
-                    //If the
-                    break;
+                long slidingWindStart = currentDate.getTime() - slidingWindow;
+                int count = 0;
+                for (Date date : dateList) {
+                    if (date.getTime() > slidingWindStart) {
+                        count++;
+                    }
+                    else {
+                        updatedList.add(date);
+                    }
                 }
+                // Remove dates counted
+                inFlightRequestToDateListMap.put(clusterId, updatedList);
+                return count;
             }
-            return inFlightRequestToDateListMap.get(clusterId).size();
+            return 0;
         }
-        return 0;
     }
 
-    public void addAnInFlightRequest(String clusterId) {
-
-        if (StringUtils.isBlank(clusterId)) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Cluster id is blank which try to set 
requests in flight" +
-                        " : [cluster] %s", clusterId));
+    void incrementInFlightRequestCount(String clusterId) {
+        synchronized (LoadBalancerStatisticsCollector.class) {
+            if (StringUtils.isBlank(clusterId)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Cluster id is null, could not increment 
in-flight request count");
+                }
+                return;
             }
-            return;
-        }
-        if (inFlightRequestToDateListMap.containsKey(clusterId)) {
-
-            Vector<Date> vector = inFlightRequestToDateListMap.get(clusterId);
-            vector.add(new Date());
-            inFlightRequestToDateListMap.put(clusterId, vector);
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("In-flight request added to counting 
list: [cluster] %s [list size] %s ", clusterId,
-                        inFlightRequestToDateListMap.get(clusterId).size()));
-
+            List<Date> dateList;
+            if (inFlightRequestToDateListMap.containsKey(clusterId)) {
+                dateList = inFlightRequestToDateListMap.get(clusterId);
+            } else {
+                dateList = Collections.synchronizedList(new ArrayList<Date>());
+                inFlightRequestToDateListMap.put(clusterId, dateList);
             }
-
-        } else {
-
-            Vector<Date> vector = new Vector<Date>();
-            vector.add(new Date());
-            inFlightRequestToDateListMap.put(clusterId, vector);
-            inFlightRequestToDateListMap.get(clusterId).add(new Date());
-
+            // Add current date to cluster date list
+            dateList.add(new Date());
             if (log.isDebugEnabled()) {
-                log.debug(String.format("New list is created for storing 
request in flight count: [cluster] %s ", clusterId));
-                log.debug(String.format("In-flight request added to counting 
list: [cluster] %s ", clusterId));
+                log.debug(String.format("In-flight request count incremented: 
[cluster] %s [count] %s ", clusterId,
+                        dateList.size()));
+
             }
         }
     }
 
-
-    public void removeAnInFlightRequest(String clusterId) {
-        if (StringUtils.isBlank(clusterId)) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Cluster id is blank which try to 
remove a requests in flight" +
-                        " : [cluster] %s ", clusterId));
+    void decrementInFlightRequestCount(String clusterId) {
+        synchronized (LoadBalancerStatisticsCollector.class) {
+            if (StringUtils.isBlank(clusterId)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Cluster id is null, could not decrement 
in-flight request count");
+                }
+                return;
             }
-            return;
-        }
-        if (!inFlightRequestToDateListMap.containsKey(clusterId)) {
 
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("In-flight list not available for 
cluster : [cluster] %s ", clusterId));
-            }
-        } else {
-            Vector<Date> vector = inFlightRequestToDateListMap.get(clusterId);
-            if(!vector.isEmpty()){
-                vector.remove(vector.size() - 1);
-            }
+            if (!inFlightRequestToDateListMap.containsKey(clusterId)) {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("In-flight request date list not 
found for cluster: [cluster] %s ", clusterId));
+                }
+            } else {
+                List<Date> dateList = 
inFlightRequestToDateListMap.get(clusterId);
+                if (!dateList.isEmpty()) {
+                    int index = dateList.size() - 1;
+                    if (index >= 0) {
+                        dateList.remove(index);
+                    }
+                }
 
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("In-flight request removed from 
counting list: [cluster] %s [list size] %s ", clusterId,
-                                        
inFlightRequestToDateListMap.get(clusterId).size()));
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("In-flight request count 
decremented: [cluster] %s [count] %s ", clusterId,
+                            dateList.size()));
+                }
             }
         }
     }
-
-//    public void incrementInFlightRequestCount(String clusterId) {
-//        incrementInFlightRequestCount(clusterId, 1);
-//    }
-
-//    private void incrementInFlightRequestCount(String clusterId, int value) {
-//        if (StringUtils.isBlank(clusterId)) {
-//            return;
-//        }
-//
-//        int count = getInFlightRequestCount(clusterId);
-//        addAnInFlightRequest(clusterId, (count + value));
-//    }
-
-//    public void decrementInFlightRequestCount(String clusterId) {
-//        decrementInFlightRequestCount(clusterId, 1);
-//    }
-
-//    private void decrementInFlightRequestCount(String clusterId, int value) {
-//        if (StringUtils.isBlank(clusterId)) {
-//            return;
-//        }
-//
-//        int count = getInFlightRequestCount(clusterId);
-//        int newValue = (count - value) < 0 ? 0 : (count - value);
-//        addAnInFlightRequest(clusterId, newValue);
-//    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/b1ae4e25/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java
new file mode 100644
index 0000000..3351cd6
--- /dev/null
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.load.balancer.statistics;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * An executor service to asynchronously execute statistics update calls 
without blocking the
+ * mediation flow.
+ */
+public class LoadBalancerStatisticsExecutor {
+    private static final Log log = 
LogFactory.getLog(LoadBalancerStatisticsExecutor.class);
+    private static volatile LoadBalancerStatisticsExecutor instance;
+
+    private ExecutorService service;
+
+    private LoadBalancerStatisticsExecutor() {
+        // TODO: Move thread pool count to loadbalancer.conf
+        service = Executors.newFixedThreadPool(10);
+    }
+
+    public static LoadBalancerStatisticsExecutor getInstance() {
+        if (instance == null) {
+            synchronized (LoadBalancerStatisticsExecutor.class) {
+                if (instance == null) {
+                    instance = new LoadBalancerStatisticsExecutor();
+                }
+            }
+        }
+        return instance;
+    }
+
+    public ExecutorService getService() {
+        return service;
+    }
+}

Reply via email to