Repository: incubator-stratos Updated Branches: refs/heads/master 48e5c0c54 -> 22662cc1f
Fixed in-flight request count management logic on fault requests, added endpoint-timeout configuration parameter to loadbalancer.conf Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/9d088ef1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/9d088ef1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/9d088ef1 Branch: refs/heads/master Commit: 9d088ef16b94455359b82e5ff286d725ebb65cc3 Parents: 24cc743 Author: Imesh Gunaratne <[email protected]> Authored: Sun Apr 13 21:06:37 2014 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Sun Apr 13 21:06:37 2014 +0530 ---------------------------------------------------------------------- .../LoadBalancerStatisticsReader.java | 2 +- .../LoadBalancerStatisticsNotifier.java | 2 +- .../conf/LoadBalancerConfiguration.java | 26 ++++- .../load/balancer/conf/util/Constants.java | 2 + .../TenantAwareLoadBalanceEndpoint.java | 107 ++++++++++++------- .../LoadBalancerStatisticsCollector.java | 70 ++++++------ .../LoadBalancerStatisticsCollectorTest.java | 87 +++++++++++++++ .../sample/configuration/loadbalancer1.conf | 3 + .../sample/configuration/loadbalancer2.conf | 3 + .../sample/configuration/loadbalancer3.conf | 3 + .../extension/HAProxyStatisticsReader.java | 2 +- .../src/main/conf/loadbalancer.conf | 3 + 12 files changed, 230 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java index 0e6e265..4a83aee 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java @@ -28,5 +28,5 @@ public interface LoadBalancerStatisticsReader { * Get in-flight request count of a sliding window configured e.g. Requests in flight of last minute. * @param clusterId */ - int getInFlightRequestCountOfSlidingWindow(String clusterId); + int getInFlightRequestCount(String clusterId); } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java index 2e94108..4fe2504 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java @@ -78,7 +78,7 @@ public class LoadBalancerStatisticsNotifier implements Runnable { for (Cluster cluster : service.getClusters()) { if (!cluster.isLbCluster()) { // Publish in-flight request count of load balancer's network partition - requestCount = statsReader.getInFlightRequestCountOfSlidingWindow(cluster.getClusterId()); + requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId()); inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId, requestCount); if (log.isDebugEnabled()) { log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d", http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java index 1acf574..5f52411 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java @@ -51,6 +51,7 @@ public class LoadBalancerConfiguration { private String defaultAlgorithmName; private boolean failOverEnabled; private boolean sessionAffinityEnabled; + private long endpointTimeout; private long sessionTimeout; private boolean cepStatsPublisherEnabled; private String mbIp; @@ -78,7 +79,7 @@ public class LoadBalancerConfiguration { /** * Get load balancer configuration singleton instance. * - * @return + * @return Load balancer configuration */ public static LoadBalancerConfiguration getInstance() { if (instance == null) { @@ -130,6 +131,14 @@ public class LoadBalancerConfiguration { this.sessionAffinityEnabled = sessionAffinityEnabled; } + public long getEndpointTimeout() { + return endpointTimeout; + } + + public void setEndpointTimeout(long endpointTimeout) { + this.endpointTimeout = endpointTimeout; + } + public long getSessionTimeout() { return sessionTimeout; } @@ -311,12 +320,27 @@ public class LoadBalancerConfiguration { if (StringUtils.isNotBlank(sessionAffinity)) { configuration.setSessionAffinityEnabled(Boolean.parseBoolean(sessionAffinity)); } + + String endpointTimeout = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_ENDPOINT_TIMEOUT); + if (StringUtils.isNotBlank(endpointTimeout)) { + configuration.setEndpointTimeout(Long.parseLong(endpointTimeout)); + } else { + // Endpoint timeout is not found, set default value + configuration.setEndpointTimeout(Constants.DEFAULT_ENDPOINT_TIMEOUT); + if(log.isWarnEnabled()) { + log.warn(String.format("Endpoint timeout not found, using default: %d", configuration.getEndpointTimeout())); + } + } + String sessionTimeout = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_SESSION_TIMEOUT); if (StringUtils.isNotBlank(sessionTimeout)) { configuration.setSessionTimeout(Long.parseLong(sessionTimeout)); } else { // Session timeout is not found, set default value configuration.setSessionTimeout(Constants.DEFAULT_SESSION_TIMEOUT); + if(log.isWarnEnabled()) { + log.warn(String.format("Session timeout not found, using default: %d", configuration.getSessionTimeout())); + } } String topologyEventListenerEnabled = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_TOPOLOGY_EVENT_LISTENER); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java index ce6b8e7..72851af 100755 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java @@ -35,6 +35,7 @@ public class Constants { public static final String CONF_PROPERTY_ALGORITHM = "algorithm"; public static final String CONF_PROPERTY_FAILOVER = "failover"; public static final String CONF_PROPERTY_SESSION_AFFINITY = "session-affinity"; + public static final String CONF_PROPERTY_ENDPOINT_TIMEOUT = "endpoint-timeout"; public static final String CONF_PROPERTY_SESSION_TIMEOUT = "session-timeout"; public static final String CONF_PROPERTY_TOPOLOGY_EVENT_LISTENER = "topology-event-listener"; public static final String CONF_PROPERTY_TOPOLOGY_MEMBER_IP_TYPE = "topology-member-ip-type"; @@ -60,6 +61,7 @@ public class Constants { public static final String CONF_PROPERTY_NETWORK_PARTITION_ID = "network-partition-id"; public static final String CONF_DELIMITER_HOSTS = ","; + public static final long DEFAULT_ENDPOINT_TIMEOUT = 15000; public static final long DEFAULT_SESSION_TIMEOUT = 90000; public static final String STATIC_NETWORK_PARTITION = "static-network-partition"; public static final String STATIC_PARTITION = "static-partition"; http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java index 048b26c..3d71a0a 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java @@ -28,6 +28,7 @@ import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory; import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration; import org.apache.stratos.load.balancer.conf.domain.MemberIpType; import org.apache.stratos.load.balancer.conf.domain.TenantIdentifier; +import org.apache.stratos.load.balancer.statistics.InFlightRequestDecrementCallable; import org.apache.stratos.load.balancer.statistics.InFlightRequestIncrementCallable; import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsExecutor; import org.apache.stratos.load.balancer.util.Constants; @@ -397,10 +398,13 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints endpoint.setEnableMBeanStats(false); endpoint.setName("DLB:" + member.getHostName() + ":" + member.getPort() + ":" + UUID.randomUUID()); + EndpointDefinition definition = new EndpointDefinition(); - definition.setSuspendMaximumDuration(10000); + definition.setTimeoutAction(SynapseConstants.DISCARD_AND_FAULT); + definition.setTimeoutDuration(LoadBalancerConfiguration.getInstance().getEndpointTimeout()); definition.setReplicationDisabled(true); definition.setAddress(to.getAddress()); + endpoint.setDefinition(definition); endpoint.init((SynapseEnvironment) ((Axis2MessageContext) synCtx).getAxis2MessageContext(). @@ -510,13 +514,12 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints Endpoint endpoint = getEndpoint(to, currentMember, synCtx); - if (isFailover()) { - faultHandler.setTo(to); - faultHandler.setCurrentMember(currentMember); - faultHandler.setCurrentEp(endpoint); - synCtx.pushFaultHandler(faultHandler); - synCtx.getEnvelope().build(); - } + // Push fault handler to manage statistics and fail-over logic + faultHandler.setTo(to); + faultHandler.setCurrentMember(currentMember); + faultHandler.setCurrentEp(endpoint); + synCtx.pushFaultHandler(faultHandler); + synCtx.getEnvelope().build(); if (isSessionAffinityBasedLB()) { synCtx.setProperty(SynapseConstants.PROP_SAL_ENDPOINT_DEFAULT_SESSION_TIMEOUT, getSessionTimeout()); @@ -575,6 +578,22 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints } } + private void decrementInFlightRequestCount(MessageContext messageContext) { + try { + String clusterId = (String) messageContext.getProperty(Constants.CLUSTER_ID); + if(StringUtils.isBlank(clusterId)) { + throw new RuntimeException("Cluster id not found in message context"); + } + FutureTask<Object> task = new FutureTask<Object>(new InFlightRequestDecrementCallable(clusterId)); + LoadBalancerStatisticsExecutor.getInstance().getService().submit(task); + } + catch (Exception e) { + if(log.isDebugEnabled()) { + log.debug("Could not decrement in-flight request count", e); + } + } + } + public void setDispatcher(HttpSessionDispatcher dispatcher) { this.dispatcher = dispatcher; } @@ -634,41 +653,57 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints @Override public void onFault(MessageContext synCtx) { - // Cleanup endpoint if exists - if (currentEp != null) { - currentEp.destroy(); - } - if (currentMember == null) { - return; + if (log.isWarnEnabled()) { + log.warn(String.format("A fault detected in message sent to: %s ", (to != null) ? to.getAddress() : "address not found")); } - // Add current member to faulty members - faultyMembers.put(currentMember.getHostName(), true); + // Decrement in-flight request count + decrementInFlightRequestCount(synCtx); - currentMember = findNextMember(synCtx); - if (currentMember == null) { - String msg = String.format("No application members available to serve the request %s", synCtx.getTo().getAddress()); - if (log.isErrorEnabled()) { - log.error(msg); + if (isFailover()) { + if(log.isDebugEnabled()) { + log.debug("Fail-over enabled, trying to send the message to the next available member"); + } + + // Cleanup endpoint if exists + if (currentEp != null) { + currentEp.destroy(); + } + if (currentMember == null) { + if(log.isErrorEnabled()) { + log.error("Current member is null, could not fail-over"); + } + return; } - throwSynapseException(synCtx, 404, msg); - } - if (faultyMembers.containsKey(currentMember.getHostName())) { - // This member has been identified as faulty previously. It implies that - // this request could not be served by any of the members in the cluster. - throwSynapseException(synCtx, 404, String.format("Requested resource could not be found")); - } - synCtx.setTo(to); - if (isSessionAffinityBasedLB()) { - //We are sending the this message on a new session, - // hence we need to remove previous session information - Set pros = synCtx.getPropertyKeySet(); - if (pros != null) { - pros.remove(SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION); + // Add current member to faulty members + faultyMembers.put(currentMember.getHostName(), true); + + currentMember = findNextMember(synCtx); + if (currentMember == null) { + String msg = String.format("No members available to serve the request %s", (to != null) ? to.getAddress() : "address not found"); + if (log.isErrorEnabled()) { + log.error(msg); + } + throwSynapseException(synCtx, 404, msg); + } + if (faultyMembers.containsKey(currentMember.getHostName())) { + // This member has been identified as faulty previously. It implies that + // this request could not be served by any of the members in the cluster. + throwSynapseException(synCtx, 404, String.format("Requested resource could not be found")); + } + + synCtx.setTo(to); + if (isSessionAffinityBasedLB()) { + //We are sending the this message on a new session, + // hence we need to remove previous session information + Set pros = synCtx.getPropertyKeySet(); + if (pros != null) { + pros.remove(SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION); + } } + sendToApplicationMember(synCtx, currentMember, this, true); } - sendToApplicationMember(synCtx, currentMember, this, true); } } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java index b49fcbf..3557d3a 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java @@ -33,11 +33,11 @@ public class LoadBalancerStatisticsCollector implements LoadBalancerStatisticsRe private static final Log log = LogFactory.getLog(LoadBalancerStatisticsCollector.class); private static volatile LoadBalancerStatisticsCollector instance; - // Map<ClusterId, ArrayList<Date>> - private Map<String, List<Date>> inFlightRequestToDateListMap; + // Map<ClusterId, Integer> + private Map<String, Integer> clusterIdRequestCountMap; private LoadBalancerStatisticsCollector() { - inFlightRequestToDateListMap = new ConcurrentHashMap<String, List<Date>>(); + clusterIdRequestCountMap = new ConcurrentHashMap<String, Integer>(); } public static LoadBalancerStatisticsCollector getInstance() { @@ -54,28 +54,22 @@ public class LoadBalancerStatisticsCollector implements LoadBalancerStatisticsRe return instance; } - public int getInFlightRequestCountOfSlidingWindow(String clusterId) { + /** + * Clear load balancer statistics collector singleton instance. + */ + public static void clear() { synchronized (LoadBalancerStatisticsCollector.class) { - // Sliding window in milliseconds - int slidingWindow = 10000; // TODO Move this to loadbalancer.conf + instance = null; + } + } - if (inFlightRequestToDateListMap.containsKey(clusterId)) { - List<Date> dateList = inFlightRequestToDateListMap.get(clusterId); - List<Date> updatedList = Collections.synchronizedList(new ArrayList<Date>()); - Date currentDate = new Date(); - long slidingWindStart = currentDate.getTime() - slidingWindow; - int count = 0; - for (Date date : dateList) { - if (date.getTime() > slidingWindStart) { - count++; - } - else { - updatedList.add(date); - } + public int getInFlightRequestCount(String clusterId) { + synchronized (LoadBalancerStatisticsCollector.class) { + if (clusterIdRequestCountMap.containsKey(clusterId)) { + Integer count = clusterIdRequestCountMap.get(clusterId); + if(count != null) { + return count; } - // Remove dates counted - inFlightRequestToDateListMap.put(clusterId, updatedList); - return count; } return 0; } @@ -89,18 +83,16 @@ public class LoadBalancerStatisticsCollector implements LoadBalancerStatisticsRe } return; } - List<Date> dateList; - if (inFlightRequestToDateListMap.containsKey(clusterId)) { - dateList = inFlightRequestToDateListMap.get(clusterId); - } else { - dateList = Collections.synchronizedList(new ArrayList<Date>()); - inFlightRequestToDateListMap.put(clusterId, dateList); + Integer count = 0; + if (clusterIdRequestCountMap.containsKey(clusterId)) { + count = clusterIdRequestCountMap.get(clusterId); } - // Add current date to cluster date list - dateList.add(new Date()); + count++; + clusterIdRequestCountMap.put(clusterId, count); + if (log.isDebugEnabled()) { log.debug(String.format("In-flight request count incremented: [cluster] %s [count] %s ", clusterId, - dateList.size())); + count)); } } @@ -115,22 +107,20 @@ public class LoadBalancerStatisticsCollector implements LoadBalancerStatisticsRe return; } - if (!inFlightRequestToDateListMap.containsKey(clusterId)) { + if (!clusterIdRequestCountMap.containsKey(clusterId)) { if (log.isDebugEnabled()) { - log.debug(String.format("In-flight request date list not found for cluster: [cluster] %s ", clusterId)); + log.debug(String.format("In-flight request count not found for cluster, could not decrement in-flight request count: [cluster] %s ", clusterId)); } } else { - List<Date> dateList = inFlightRequestToDateListMap.get(clusterId); - if (!dateList.isEmpty()) { - int index = dateList.size() - 1; - if (index >= 0) { - dateList.remove(index); - } + Integer count = clusterIdRequestCountMap.get(clusterId); + if (count != null) { + count = (count >= 1) ? (count - 1) : 0; } + clusterIdRequestCountMap.put(clusterId, count); if (log.isDebugEnabled()) { log.debug(String.format("In-flight request count decremented: [cluster] %s [count] %s ", clusterId, - dateList.size())); + count)); } } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerStatisticsCollectorTest.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerStatisticsCollectorTest.java b/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerStatisticsCollectorTest.java new file mode 100644 index 0000000..26b5f34 --- /dev/null +++ b/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerStatisticsCollectorTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.load.balancer.test; + +import org.apache.stratos.load.balancer.statistics.InFlightRequestIncrementCallable; +import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsCollector; +import org.junit.Assert; +import org.apache.stratos.load.balancer.statistics.InFlightRequestDecrementCallable; +import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsExecutor; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; + +/** + * Load balancer statistics collector tests. + */ +@RunWith(JUnit4.class) +public class LoadBalancerStatisticsCollectorTest { + + /*** + * Test in-flight request count calculation. + */ + @Test + public void testInFlightRequestCountCalculation() { + String clusterId = "cluster1"; + String incrementErrorMessage = "Could not increment in-flight request count: "; + String decrementErrorMessage = "Could not decrement in-flight request count: "; + + FutureTask<Object> task = new FutureTask<Object>(new InFlightRequestIncrementCallable(clusterId)); + executeTask(task); + Assert.assertEquals(incrementErrorMessage, 1, LoadBalancerStatisticsCollector.getInstance().getInFlightRequestCount(clusterId)); + + task = new FutureTask<Object>(new InFlightRequestIncrementCallable(clusterId)); + executeTask(task); + Assert.assertEquals(incrementErrorMessage, 2, LoadBalancerStatisticsCollector.getInstance().getInFlightRequestCount(clusterId)); + + task = new FutureTask<Object>(new InFlightRequestIncrementCallable(clusterId)); + executeTask(task); + Assert.assertEquals(incrementErrorMessage, 3, LoadBalancerStatisticsCollector.getInstance().getInFlightRequestCount(clusterId)); + + task = new FutureTask<Object>(new InFlightRequestDecrementCallable(clusterId)); + executeTask(task); + Assert.assertEquals(decrementErrorMessage, 2, LoadBalancerStatisticsCollector.getInstance().getInFlightRequestCount(clusterId)); + + task = new FutureTask<Object>(new InFlightRequestDecrementCallable(clusterId)); + executeTask(task); + Assert.assertEquals(decrementErrorMessage, 1, LoadBalancerStatisticsCollector.getInstance().getInFlightRequestCount(clusterId)); + + task = new FutureTask<Object>(new InFlightRequestDecrementCallable(clusterId)); + executeTask(task); + Assert.assertEquals(decrementErrorMessage, 0, LoadBalancerStatisticsCollector.getInstance().getInFlightRequestCount(clusterId)); + + LoadBalancerStatisticsCollector.clear(); + } + + private void executeTask(FutureTask<Object> task) { + Future future = LoadBalancerStatisticsExecutor.getInstance().getService().submit(task); + while(!future.isDone()) { + // Wait until task get executed + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // Might not need to trace + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf index 0d315de..ff11dac 100755 --- a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf +++ b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf @@ -37,6 +37,9 @@ loadbalancer { # incoming requests to members with same sessions. session-affinity: true; + # Endpoint timeout in milli-seconds + endpoint-timeout: 15000; + # Session timeout in milli-seconds session-timeout: 90000; http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf index 37325d1..9910dc9 100755 --- a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf +++ b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf @@ -37,6 +37,9 @@ loadbalancer { # incoming requests to members with same sessions. session-affinity: true; + # Endpoint timeout in milli-seconds + endpoint-timeout: 15000; + # Session timeout in milli-seconds session-timeout: 90000; http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf index b5d915f..a629a6f 100755 --- a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf +++ b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf @@ -37,6 +37,9 @@ loadbalancer { # incoming requests to members with same sessions. session-affinity: true; + # Endpoint timeout in milli-seconds + endpoint-timeout: 15000; + # Session timeout in milli-seconds session-timeout: 90000; http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java index 4c35537..b38aa3c 100644 --- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java +++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java @@ -46,7 +46,7 @@ public class HAProxyStatisticsReader implements LoadBalancerStatisticsReader { } @Override - public int getInFlightRequestCountOfSlidingWindow(String clusterId) { + public int getInFlightRequestCount(String clusterId) { String frontendId, backendId, command, output; String[] array; int totalWeight, weight; http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9d088ef1/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf ---------------------------------------------------------------------- diff --git a/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf b/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf index 7b0d511..7f61b8a 100644 --- a/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf +++ b/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf @@ -31,6 +31,9 @@ loadbalancer { # incoming requests to members with same sessions. session-affinity: true; + # Endpoint timeout in milli-seconds + endpoint-timeout: 15000; + # Session timeout in milli-seconds session-timeout: 90000;
