Updated Branches: refs/heads/master 442be445f -> 7d8a8b3cd
Fixing requests in flight decrement issue Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/7d8a8b3c Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/7d8a8b3c Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/7d8a8b3c Branch: refs/heads/master Commit: 7d8a8b3cdfb112affc230c6716d71a24e0a35c0a Parents: 442be44 Author: Lahiru Sandaruwan <[email protected]> Authored: Wed Jan 15 00:16:11 2014 +0530 Committer: Lahiru Sandaruwan <[email protected]> Committed: Wed Jan 15 00:16:11 2014 +0530 ---------------------------------------------------------------------- .../LoadBalancerStatisticsReader.java | 6 +- .../LoadBalancerStatisticsNotifier.java | 2 +- .../TenantAwareLoadBalanceEndpoint.java | 2 +- .../balancer/mediators/ResponseInterceptor.java | 2 +- .../LoadBalancerStatisticsCollector.java | 112 +++++++++++++------ .../extension/HAProxyStatisticsReader.java | 2 +- 6 files changed, 86 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7d8a8b3c/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java index 41e81e8..0e6e265 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java @@ -19,16 +19,14 @@ package org.apache.stratos.load.balancer.common.statistics; -import java.util.HashMap; - /** * Load balancer statistics reader interface. */ public interface LoadBalancerStatisticsReader { /** - * Get in-flight request count of a given cluster. + * Get in-flight request count of a sliding window configured e.g. Requests in flight of last minute. * @param clusterId */ - int getInFlightRequestCount(String clusterId); + int getInFlightRequestCountOfSlidingWindow(String clusterId); } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7d8a8b3c/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java index 7268695..ebef3f8 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java @@ -78,7 +78,7 @@ public class LoadBalancerStatisticsNotifier implements Runnable { for (Cluster cluster : service.getClusters()) { if (!cluster.isLbCluster()) { // Publish in-flight request count of load balancer's network partition - requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId()); + requestCount = statsReader.getInFlightRequestCountOfSlidingWindow(cluster.getClusterId()); inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId, requestCount); if (log.isDebugEnabled()) { log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d", http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7d8a8b3c/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java index efc88c9..05417b3 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java @@ -526,7 +526,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints if(StringUtils.isBlank(clusterId)) { throw new RuntimeException("Cluster id not found in message context"); } - LoadBalancerStatisticsCollector.getInstance().incrementInFlightRequestCount(clusterId); + LoadBalancerStatisticsCollector.getInstance().addAnInFlightRequest(clusterId); } catch (Exception e) { if(log.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7d8a8b3c/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java index cf3e768..e350840 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java @@ -40,7 +40,7 @@ public class ResponseInterceptor extends AbstractMediator implements ManagedLife if (StringUtils.isBlank(clusterId)) { throw new RuntimeException("Cluster id not found in message context"); } - LoadBalancerStatisticsCollector.getInstance().decrementInFlightRequestCount(clusterId); + LoadBalancerStatisticsCollector.getInstance().removeAnInFlightRequest(clusterId); } catch (Exception e) { if(log.isErrorEnabled()) { log.error("Could not decrement in-flight request count", e); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7d8a8b3c/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java index 72186fe..cff081b 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java @@ -1,18 +1,18 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ @@ -23,7 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** @@ -34,10 +34,10 @@ public class LoadBalancerStatisticsCollector implements LoadBalancerStatisticsRe private static volatile LoadBalancerStatisticsCollector instance; // Map<ClusterId, Map<PartitionId, InFlightRequestCount> - private Map<String, Integer> inFlightRequestCountMap; + private Map<String, Vector<Date>> inFlightRequestToDateListMap; private LoadBalancerStatisticsCollector() { - inFlightRequestCountMap = new ConcurrentHashMap<String, Integer>(); + inFlightRequestToDateListMap = new ConcurrentHashMap<String, Vector<Date>>(); } public static LoadBalancerStatisticsCollector getInstance() { @@ -54,48 +54,96 @@ public class LoadBalancerStatisticsCollector implements LoadBalancerStatisticsRe return instance; } - public int getInFlightRequestCount(String clusterId) { - if (inFlightRequestCountMap.containsKey(clusterId)) { - return inFlightRequestCountMap.get(clusterId); + public int getInFlightRequestCountOfSlidingWindow(String clusterId) { + //Clear the list before returning... + + //Sliding window in seconds + int slidingWindow = 60;//TODO get this from a config + + if (inFlightRequestToDateListMap.containsKey(clusterId)) { + Vector<Date> vector = inFlightRequestToDateListMap.get(clusterId); + Iterator<Date> itr = vector.iterator(); + while(itr.hasNext()){ + Date date = itr.next(); + Date currentDate = new Date(); + if((currentDate.getTime() - date.getTime()) > slidingWindow){ // we will remove the + itr.remove(); + } else { + //If the + break; + } + } + return inFlightRequestToDateListMap.get(clusterId).size(); } return 0; } - public void setInFlightRequestCount(String clusterId, int value) { + public void addAnInFlightRequest(String clusterId) { if (StringUtils.isBlank(clusterId)) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster id is blank which try to set requests in flight" + + " : [cluster] %s", clusterId)); + } return; } - - inFlightRequestCountMap.put(clusterId, value); + if (!inFlightRequestToDateListMap.containsKey(clusterId)) { + Vector<Date> list = inFlightRequestToDateListMap.get(clusterId); + list.add(new Date()); + inFlightRequestToDateListMap.put(clusterId, list); + } else { + inFlightRequestToDateListMap.get(clusterId).add(new Date()); + } if (log.isDebugEnabled()) { - log.debug(String.format("In-flight request count updated: [cluster] %s [value] %d", clusterId, value)); + log.debug(String.format("In-flight request count updated: [cluster] %s ", clusterId)); } } - public void incrementInFlightRequestCount(String clusterId) { - incrementInFlightRequestCount(clusterId, 1); - } - public void incrementInFlightRequestCount(String clusterId, int value) { + public void removeAnInFlightRequest(String clusterId) { if (StringUtils.isBlank(clusterId)) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster id is blank which try to remove a requests in flight" + + " : [cluster] %s ", clusterId)); + } return; } + if (!inFlightRequestToDateListMap.containsKey(clusterId)) { - int count = getInFlightRequestCount(clusterId); - setInFlightRequestCount(clusterId, (count + value)); + if (log.isDebugEnabled()) { + log.debug(String.format("In-flight list not available for cluster : [cluster] %s ", clusterId)); + } + } else { + inFlightRequestToDateListMap.remove(clusterId); + } + if (log.isDebugEnabled()) { + log.debug(String.format("In-flight request count updated: [cluster] %s ", clusterId)); + } } - public void decrementInFlightRequestCount(String clusterId) { - decrementInFlightRequestCount(clusterId, 1); - } +// public void incrementInFlightRequestCount(String clusterId) { +// incrementInFlightRequestCount(clusterId, 1); +// } - public void decrementInFlightRequestCount(String clusterId, int value) { - if (StringUtils.isBlank(clusterId)) { - return; - } +// private void incrementInFlightRequestCount(String clusterId, int value) { +// if (StringUtils.isBlank(clusterId)) { +// return; +// } +// +// int count = getInFlightRequestCount(clusterId); +// addAnInFlightRequest(clusterId, (count + value)); +// } - int count = getInFlightRequestCount(clusterId); - int newValue = (count - value) < 0 ? 0 : (count - value); - setInFlightRequestCount(clusterId, newValue); - } -} +// public void decrementInFlightRequestCount(String clusterId) { +// decrementInFlightRequestCount(clusterId, 1); +// } + +// private void decrementInFlightRequestCount(String clusterId, int value) { +// if (StringUtils.isBlank(clusterId)) { +// return; +// } +// +// int count = getInFlightRequestCount(clusterId); +// int newValue = (count - value) < 0 ? 0 : (count - value); +// addAnInFlightRequest(clusterId, newValue); +// } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7d8a8b3c/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java index b38aa3c..4c35537 100644 --- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java +++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java @@ -46,7 +46,7 @@ public class HAProxyStatisticsReader implements LoadBalancerStatisticsReader { } @Override - public int getInFlightRequestCount(String clusterId) { + public int getInFlightRequestCountOfSlidingWindow(String clusterId) { String frontendId, backendId, command, output; String[] array; int totalWeight, weight;
