Repository: incubator-stratos Updated Branches: refs/heads/master ac003748a -> 7eff3e8d6
Implemented new functionality to execute statistics update calls using an executor service Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/b1ae4e25 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/b1ae4e25 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/b1ae4e25 Branch: refs/heads/master Commit: b1ae4e2585f736ae7221da6bad1880c4d65c947e Parents: 53f1fc8 Author: Imesh Gunaratne <[email protected]> Authored: Thu Feb 27 02:01:31 2014 -0500 Committer: Imesh Gunaratne <[email protected]> Committed: Thu Feb 27 02:01:31 2014 -0500 ---------------------------------------------------------------------- .../TenantAwareLoadBalanceEndpoint.java | 7 +- .../balancer/mediators/ResponseInterceptor.java | 22 +-- .../InFlightRequestDecrementCallable.java | 38 +++++ .../InFlightRequestIncrementCallable.java | 38 +++++ .../LoadBalancerStatisticsCollector.java | 159 ++++++++----------- .../LoadBalancerStatisticsExecutor.java | 56 +++++++ 6 files changed, 214 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/b1ae4e25/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java index aeeb3a1..b88848e 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java @@ -28,7 +28,8 @@ import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory; import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration; import org.apache.stratos.load.balancer.conf.domain.MemberIpType; import org.apache.stratos.load.balancer.conf.domain.TenantIdentifier; -import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsCollector; +import org.apache.stratos.load.balancer.statistics.InFlightRequestIncrementCallable; +import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsExecutor; import org.apache.stratos.load.balancer.util.Constants; import org.apache.stratos.messaging.domain.tenant.Tenant; import org.apache.stratos.messaging.domain.topology.Member; @@ -51,6 +52,7 @@ import java.io.Serializable; import java.net.MalformedURLException; import java.net.URL; import java.util.*; +import java.util.concurrent.FutureTask; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -559,7 +561,8 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints if(StringUtils.isBlank(clusterId)) { throw new RuntimeException("Cluster id not found in message context"); } - LoadBalancerStatisticsCollector.getInstance().addAnInFlightRequest(clusterId); + FutureTask<Object> task = new FutureTask<Object>(new InFlightRequestIncrementCallable(clusterId)); + LoadBalancerStatisticsExecutor.getInstance().getService().submit(task); } catch (Exception e) { if(log.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/b1ae4e25/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java index 7ca6c8d..3913248 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java @@ -19,18 +19,28 @@ package org.apache.stratos.load.balancer.mediators; import org.apache.commons.lang3.StringUtils; -import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsCollector; +import org.apache.stratos.load.balancer.statistics.InFlightRequestDecrementCallable; +import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsExecutor; import org.apache.stratos.load.balancer.util.Constants; import org.apache.synapse.ManagedLifecycle; import org.apache.synapse.MessageContext; import org.apache.synapse.core.SynapseEnvironment; import org.apache.synapse.mediators.AbstractMediator; +import java.util.concurrent.FutureTask; + /** * This Synapse mediator counts the responses that are going across LB. */ public class ResponseInterceptor extends AbstractMediator implements ManagedLifecycle { + @Override + public void init(SynapseEnvironment arg) { + if (log.isDebugEnabled()) { + log.debug("ResponseInterceptor mediator initiated"); + } + } + public boolean mediate(MessageContext messageContext) { try { if (log.isDebugEnabled()) { @@ -38,7 +48,8 @@ public class ResponseInterceptor extends AbstractMediator implements ManagedLife } String clusterId = (String) messageContext.getProperty(Constants.CLUSTER_ID); if (StringUtils.isNotBlank(clusterId)) { - LoadBalancerStatisticsCollector.getInstance().removeAnInFlightRequest(clusterId); + FutureTask<Object> task = new FutureTask<Object>(new InFlightRequestDecrementCallable(clusterId)); + LoadBalancerStatisticsExecutor.getInstance().getService().submit(task); } else{ if (log.isDebugEnabled()) { log.debug("Could not decrement in-flight request count : cluster id not found in message context"); @@ -59,11 +70,4 @@ public class ResponseInterceptor extends AbstractMediator implements ManagedLife log.debug("Response interceptor mediator destroyed"); } } - - @Override - public void init(SynapseEnvironment arg0) { - if (log.isDebugEnabled()) { - log.debug("ResponseInterceptor mediator initiated"); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/b1ae4e25/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/InFlightRequestDecrementCallable.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/InFlightRequestDecrementCallable.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/InFlightRequestDecrementCallable.java new file mode 100644 index 0000000..a880d32 --- /dev/null +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/InFlightRequestDecrementCallable.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.load.balancer.statistics; + +import java.util.concurrent.Callable; + +/** + * In-flight request decrement callable definition. + */ +public class InFlightRequestDecrementCallable implements Callable { + private String clusterId; + + public InFlightRequestDecrementCallable(String clusterId) { + this.clusterId = clusterId; + } + + @Override + public Object call() throws Exception { + LoadBalancerStatisticsCollector.getInstance().decrementInFlightRequestCount(clusterId); + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/b1ae4e25/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/InFlightRequestIncrementCallable.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/InFlightRequestIncrementCallable.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/InFlightRequestIncrementCallable.java new file mode 100644 index 0000000..e670eab --- /dev/null +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/InFlightRequestIncrementCallable.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.load.balancer.statistics; + +import java.util.concurrent.Callable; + +/** + * In-flight request increment callable definition. + */ +public class InFlightRequestIncrementCallable implements Callable { + private String clusterId; + + public InFlightRequestIncrementCallable(String clusterId) { + this.clusterId = clusterId; + } + + @Override + public Object call() throws Exception { + LoadBalancerStatisticsCollector.getInstance().incrementInFlightRequestCount(clusterId); + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/b1ae4e25/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java index f522a27..b49fcbf 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java @@ -33,18 +33,18 @@ public class LoadBalancerStatisticsCollector implements LoadBalancerStatisticsRe private static final Log log = LogFactory.getLog(LoadBalancerStatisticsCollector.class); private static volatile LoadBalancerStatisticsCollector instance; - // Map<ClusterId, Map<PartitionId, InFlightRequestCount> - private Map<String, Vector<Date>> inFlightRequestToDateListMap; + // Map<ClusterId, ArrayList<Date>> + private Map<String, List<Date>> inFlightRequestToDateListMap; private LoadBalancerStatisticsCollector() { - inFlightRequestToDateListMap = new ConcurrentHashMap<String, Vector<Date>>(); + inFlightRequestToDateListMap = new ConcurrentHashMap<String, List<Date>>(); } public static LoadBalancerStatisticsCollector getInstance() { if (instance == null) { synchronized (LoadBalancerStatisticsCollector.class) { if (instance == null) { - if(log.isDebugEnabled()) { + if (log.isDebugEnabled()) { log.debug("Load balancer in-flight request count collector instance created"); } instance = new LoadBalancerStatisticsCollector(); @@ -55,115 +55,84 @@ public class LoadBalancerStatisticsCollector implements LoadBalancerStatisticsRe } public int getInFlightRequestCountOfSlidingWindow(String clusterId) { - //Clear the list before returning... + synchronized (LoadBalancerStatisticsCollector.class) { + // Sliding window in milliseconds + int slidingWindow = 10000; // TODO Move this to loadbalancer.conf - //Sliding window in Milliseconds - int slidingWindow = 60000;//TODO get this from a config - - - if (inFlightRequestToDateListMap.containsKey(clusterId)) { - Vector<Date> vector = inFlightRequestToDateListMap.get(clusterId); - Iterator<Date> itr = vector.iterator(); - while(itr.hasNext()){ - Date date = itr.next(); + if (inFlightRequestToDateListMap.containsKey(clusterId)) { + List<Date> dateList = inFlightRequestToDateListMap.get(clusterId); + List<Date> updatedList = Collections.synchronizedList(new ArrayList<Date>()); Date currentDate = new Date(); - if((currentDate.getTime() - date.getTime()) > slidingWindow){ // we will remove the - itr.remove(); - } else { - //If the - break; + long slidingWindStart = currentDate.getTime() - slidingWindow; + int count = 0; + for (Date date : dateList) { + if (date.getTime() > slidingWindStart) { + count++; + } + else { + updatedList.add(date); + } } + // Remove dates counted + inFlightRequestToDateListMap.put(clusterId, updatedList); + return count; } - return inFlightRequestToDateListMap.get(clusterId).size(); + return 0; } - return 0; } - public void addAnInFlightRequest(String clusterId) { - - if (StringUtils.isBlank(clusterId)) { - if (log.isDebugEnabled()) { - log.debug(String.format("Cluster id is blank which try to set requests in flight" + - " : [cluster] %s", clusterId)); + void incrementInFlightRequestCount(String clusterId) { + synchronized (LoadBalancerStatisticsCollector.class) { + if (StringUtils.isBlank(clusterId)) { + if (log.isDebugEnabled()) { + log.debug("Cluster id is null, could not increment in-flight request count"); + } + return; } - return; - } - if (inFlightRequestToDateListMap.containsKey(clusterId)) { - - Vector<Date> vector = inFlightRequestToDateListMap.get(clusterId); - vector.add(new Date()); - inFlightRequestToDateListMap.put(clusterId, vector); - if (log.isDebugEnabled()) { - log.debug(String.format("In-flight request added to counting list: [cluster] %s [list size] %s ", clusterId, - inFlightRequestToDateListMap.get(clusterId).size())); - + List<Date> dateList; + if (inFlightRequestToDateListMap.containsKey(clusterId)) { + dateList = inFlightRequestToDateListMap.get(clusterId); + } else { + dateList = Collections.synchronizedList(new ArrayList<Date>()); + inFlightRequestToDateListMap.put(clusterId, dateList); } - - } else { - - Vector<Date> vector = new Vector<Date>(); - vector.add(new Date()); - inFlightRequestToDateListMap.put(clusterId, vector); - inFlightRequestToDateListMap.get(clusterId).add(new Date()); - + // Add current date to cluster date list + dateList.add(new Date()); if (log.isDebugEnabled()) { - log.debug(String.format("New list is created for storing request in flight count: [cluster] %s ", clusterId)); - log.debug(String.format("In-flight request added to counting list: [cluster] %s ", clusterId)); + log.debug(String.format("In-flight request count incremented: [cluster] %s [count] %s ", clusterId, + dateList.size())); + } } } - - public void removeAnInFlightRequest(String clusterId) { - if (StringUtils.isBlank(clusterId)) { - if (log.isWarnEnabled()) { - log.warn(String.format("Cluster id is blank which try to remove a requests in flight" + - " : [cluster] %s ", clusterId)); + void decrementInFlightRequestCount(String clusterId) { + synchronized (LoadBalancerStatisticsCollector.class) { + if (StringUtils.isBlank(clusterId)) { + if (log.isDebugEnabled()) { + log.debug("Cluster id is null, could not decrement in-flight request count"); + } + return; } - return; - } - if (!inFlightRequestToDateListMap.containsKey(clusterId)) { - if (log.isDebugEnabled()) { - log.debug(String.format("In-flight list not available for cluster : [cluster] %s ", clusterId)); - } - } else { - Vector<Date> vector = inFlightRequestToDateListMap.get(clusterId); - if(!vector.isEmpty()){ - vector.remove(vector.size() - 1); - } + if (!inFlightRequestToDateListMap.containsKey(clusterId)) { + if (log.isDebugEnabled()) { + log.debug(String.format("In-flight request date list not found for cluster: [cluster] %s ", clusterId)); + } + } else { + List<Date> dateList = inFlightRequestToDateListMap.get(clusterId); + if (!dateList.isEmpty()) { + int index = dateList.size() - 1; + if (index >= 0) { + dateList.remove(index); + } + } - if (log.isDebugEnabled()) { - log.debug(String.format("In-flight request removed from counting list: [cluster] %s [list size] %s ", clusterId, - inFlightRequestToDateListMap.get(clusterId).size())); + if (log.isDebugEnabled()) { + log.debug(String.format("In-flight request count decremented: [cluster] %s [count] %s ", clusterId, + dateList.size())); + } } } } - -// public void incrementInFlightRequestCount(String clusterId) { -// incrementInFlightRequestCount(clusterId, 1); -// } - -// private void incrementInFlightRequestCount(String clusterId, int value) { -// if (StringUtils.isBlank(clusterId)) { -// return; -// } -// -// int count = getInFlightRequestCount(clusterId); -// addAnInFlightRequest(clusterId, (count + value)); -// } - -// public void decrementInFlightRequestCount(String clusterId) { -// decrementInFlightRequestCount(clusterId, 1); -// } - -// private void decrementInFlightRequestCount(String clusterId, int value) { -// if (StringUtils.isBlank(clusterId)) { -// return; -// } -// -// int count = getInFlightRequestCount(clusterId); -// int newValue = (count - value) < 0 ? 0 : (count - value); -// addAnInFlightRequest(clusterId, newValue); -// } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/b1ae4e25/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java new file mode 100644 index 0000000..3351cd6 --- /dev/null +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.load.balancer.statistics; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * An executor service to asynchronously execute statistics update calls without blocking the + * mediation flow. + */ +public class LoadBalancerStatisticsExecutor { + private static final Log log = LogFactory.getLog(LoadBalancerStatisticsExecutor.class); + private static volatile LoadBalancerStatisticsExecutor instance; + + private ExecutorService service; + + private LoadBalancerStatisticsExecutor() { + // TODO: Move thread pool count to loadbalancer.conf + service = Executors.newFixedThreadPool(10); + } + + public static LoadBalancerStatisticsExecutor getInstance() { + if (instance == null) { + synchronized (LoadBalancerStatisticsExecutor.class) { + if (instance == null) { + instance = new LoadBalancerStatisticsExecutor(); + } + } + } + return instance; + } + + public ExecutorService getService() { + return service; + } +}
