Updated Branches: refs/heads/master d29ee5d60 -> 24101ae18
Added load balancing statistics reporting interface to load balancing extension api Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/24101ae1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/24101ae1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/24101ae1 Branch: refs/heads/master Commit: 24101ae18fb63af9306a1a9d7c101ea66909771f Parents: d29ee5d Author: Imesh Gunaratne <[email protected]> Authored: Thu Nov 14 19:16:38 2013 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Thu Nov 14 19:16:38 2013 +0530 ---------------------------------------------------------------------- .../statistics/LoadBalancerStatsPublisher.java | 34 +++++ .../statistics/LoadBalancingStatsCollector.java | 110 ---------------- .../statistics/WSO2CEPStatsPublisher.java | 97 ++++++++++++++ .../observers/WSO2CEPStatsObserver.java | 102 --------------- .../extension/api/LoadBalancerExtension.java | 23 +++- .../api/LoadBalancerStatsNotifier.java | 77 +++++++++++ .../extension/api/LoadBalancerStatsReader.java | 34 +++++ .../TenantAwareLoadBalanceEndpoint.java | 4 +- .../balancer/mediators/ResponseInterceptor.java | 4 +- .../statistics/LoadBalancerStatsCollector.java | 128 +++++++++++++++++++ .../observers/WSO2CEPStatsObserver.java | 40 ++++++ .../haproxy/extension/HAProxyStatsReader.java | 33 +++++ .../apache/stratos/haproxy/extension/Main.java | 3 +- 13 files changed, 468 insertions(+), 221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java new file mode 100644 index 0000000..5687d76 --- /dev/null +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.load.balancer.common.statistics; + +import java.util.Map; + +/** + * Load balancer statistics publisher interface. + */ +public interface LoadBalancerStatsPublisher { + + /** + * Publish statistics as a map of Cluster Id, In-flight Request Count. + * @param stats + */ + void publish(Map<String, Integer> stats); +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancingStatsCollector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancingStatsCollector.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancingStatsCollector.java deleted file mode 100644 index a55fc22..0000000 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancingStatsCollector.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.load.balancer.common.statistics; - -import java.util.HashMap; -import java.util.Map; -import java.util.Observable; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.stratos.load.balancer.common.statistics.observers.WSO2CEPStatsObserver; - -/** - * This is the load balancing stats collector and any observer can get registered here - * and receive notifications periodically. - * This is a Singleton object. - * @author nirmal - * - */ -public class LoadBalancingStatsCollector extends Observable{ - - private static LoadBalancingStatsCollector collector; - private Map<String, Integer> clusterIdToRequestInflightCountMap; - private Thread notifier; - - private LoadBalancingStatsCollector() { - clusterIdToRequestInflightCountMap = new ConcurrentHashMap<String, Integer>(); - if (notifier == null || (notifier != null && !notifier.isAlive())) { - notifier = new Thread(new ObserverNotifier()); - notifier.start(); - } - } - - public static LoadBalancingStatsCollector getInstance() { - if (collector == null) { - synchronized (LoadBalancingStatsCollector.class) { - if (collector == null) { - collector = new LoadBalancingStatsCollector(); - // add observers - collector.addObserver(new WSO2CEPStatsObserver()); - } - } - } - return collector; - } - - public void incrementRequestInflightCount(String clusterId) { - if(clusterId == null) { - return; - } - - int value = 1; - if(clusterIdToRequestInflightCountMap.get(clusterId) != null) { - value += clusterIdToRequestInflightCountMap.get(clusterId); - } - clusterIdToRequestInflightCountMap.put(clusterId, value); - setChanged(); - } - - public void decrementRequestInflightCount(String clusterId) { - if(clusterId == null) { - return; - } - - int value = -1; - if(clusterIdToRequestInflightCountMap.get(clusterId) != null) { - value += clusterIdToRequestInflightCountMap.get(clusterId); - } - clusterIdToRequestInflightCountMap.put(clusterId, value); - setChanged(); - } - - - /** - * This thread will notify all the observers of this subject. - * @author nirmal - * - */ - private class ObserverNotifier implements Runnable { - - @Override - public void run() { - while(true) { - try { - Thread.sleep(15000); - } catch (InterruptedException ignore) { - } - LoadBalancingStatsCollector.getInstance().notifyObservers(new HashMap<String, Integer>(clusterIdToRequestInflightCountMap)); - } - - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java new file mode 100644 index 0000000..2d41242 --- /dev/null +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.load.balancer.common.statistics; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.databridge.agent.thrift.Agent; +import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher; +import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration; +import org.wso2.carbon.databridge.agent.thrift.exception.AgentException; +import org.wso2.carbon.databridge.commons.Event; +import org.wso2.carbon.utils.CarbonUtils; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +/** + * WSO2 CEP statistics publisher for the load balancer. + */ +public class WSO2CEPStatsPublisher implements LoadBalancerStatsPublisher { + private static final Log log = LogFactory.getLog(WSO2CEPStatsPublisher.class); + private static final String CALL_CENTER_DATA_STREAM = "stratos.lb.stats"; + private static final String VERSION = "1.0.0"; + private AsyncDataPublisher asyncDataPublisher; + + public WSO2CEPStatsPublisher() { + AgentConfiguration agentConfiguration = new AgentConfiguration(); + // TODO get following from somewhere, without hard-coding. + System.setProperty("javax.net.ssl.trustStore", CarbonUtils.getCarbonHome()+ File.separator+"repository"+ + File.separator+"resources"+File.separator+"security"+File.separator+"client-truststore.jks" ); + System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon"); + + Agent agent = new Agent(agentConfiguration); + //TODO read following from a config file? + String ip = System.getProperty("thrift.receiver.ip"); + String port = System.getProperty("thrift.receiver.port"); + //Using Asynchronous data publisher + asyncDataPublisher = new AsyncDataPublisher("tcp://"+ip+":"+port+"", "admin", "admin", agent); + String streamDefinition = "{" + + " 'name':'" + CALL_CENTER_DATA_STREAM + "'," + + " 'version':'" + VERSION + "'," + + " 'nickName': 'lb stats'," + + " 'description': 'lb stats'," + + " 'metaData':[]," + + " 'payloadData':[" + + " {'name':'cluster_id','type':'STRING'}," + + " {'name':'in_flight_requests','type':'INT'}" + + " ]" + + "}"; + asyncDataPublisher.addStreamDefinition(streamDefinition, CALL_CENTER_DATA_STREAM, VERSION); + } + + @Override + public void publish(Map<String, Integer> stats) { + + for (Map.Entry<String, Integer> entry : stats.entrySet()) { + + Object[] payload = new Object[]{entry.getKey(), entry.getValue()}; + Event event = eventObject(null, null, payload, new HashMap<String, String>()); + try { + asyncDataPublisher.publish(CALL_CENTER_DATA_STREAM, VERSION, event); + } catch (AgentException e) { + log.error("Failed to publish events. ", e); + } + + } + stats = null; + } + + private static Event eventObject(Object[] correlationData, Object[] metaData, + Object[] payLoadData, HashMap<String, String> map) { + Event event = new Event(); + event.setCorrelationData(correlationData); + event.setMetaData(metaData); + event.setPayloadData(payLoadData); + event.setArbitraryDataMap(map); + return event; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/observers/WSO2CEPStatsObserver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/observers/WSO2CEPStatsObserver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/observers/WSO2CEPStatsObserver.java deleted file mode 100644 index f53c5a5..0000000 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/observers/WSO2CEPStatsObserver.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.load.balancer.common.statistics.observers; - -import java.io.File; -import java.util.HashMap; -import java.util.Map; -import java.util.Observable; -import java.util.Observer; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.wso2.carbon.databridge.agent.thrift.Agent; -import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher; -import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration; -import org.wso2.carbon.databridge.agent.thrift.exception.AgentException; -import org.wso2.carbon.databridge.commons.Event; -import org.wso2.carbon.utils.CarbonUtils; - -public class WSO2CEPStatsObserver implements Observer{ - - private static final Log log = LogFactory.getLog(WSO2CEPStatsObserver.class); - private static final String CALL_CENTER_DATA_STREAM = "stratos.lb.stats"; - private static final String VERSION = "1.0.0"; - private AsyncDataPublisher asyncDataPublisher; - - public WSO2CEPStatsObserver() { - AgentConfiguration agentConfiguration = new AgentConfiguration(); - // TODO get following from somewhere, without hard-coding. - System.setProperty("javax.net.ssl.trustStore", CarbonUtils.getCarbonHome()+File.separator+"repository"+ - File.separator+"resources"+File.separator+"security"+File.separator+"client-truststore.jks" ); - System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon"); - - Agent agent = new Agent(agentConfiguration); - //TODO read following from a config file? - String ip = System.getProperty("thrift.receiver.ip"); - String port = System.getProperty("thrift.receiver.port"); - //Using Asynchronous data publisher - asyncDataPublisher = new AsyncDataPublisher("tcp://"+ip+":"+port+"", "admin", "admin", agent); - String streamDefinition = "{" + - " 'name':'" + CALL_CENTER_DATA_STREAM + "'," + - " 'version':'" + VERSION + "'," + - " 'nickName': 'lb stats'," + - " 'description': 'lb stats'," + - " 'metaData':[]," + - " 'payloadData':[" + - " {'name':'cluster_id','type':'STRING'}," + - " {'name':'in_flight_requests','type':'INT'}" + - " ]" + - "}"; - asyncDataPublisher.addStreamDefinition(streamDefinition, CALL_CENTER_DATA_STREAM, VERSION); - } - - public void update(Observable arg0, Object arg1) { - if(arg1 != null && arg1 instanceof Map<?, ?>) { - Map<String, Integer> stats = (Map<String, Integer>)arg1; - publishEvents(stats); - } - } - - private void publishEvents(Map<String, Integer> stats) { - - for (Map.Entry<String, Integer> entry : stats.entrySet()) { - - Object[] payload = new Object[]{entry.getKey(), entry.getValue()}; - Event event = eventObject(null, null, payload, new HashMap<String, String>()); - try { - asyncDataPublisher.publish(CALL_CENTER_DATA_STREAM, VERSION, event); - } catch (AgentException e) { - log.error("Failed to publish events. ", e); - } - - } - stats = null; - } - - private static Event eventObject(Object[] correlationData, Object[] metaData, - Object[] payLoadData, HashMap<String, String> map) { - Event event = new Event(); - event.setCorrelationData(correlationData); - event.setMetaData(metaData); - event.setPayloadData(payLoadData); - event.setArbitraryDataMap(map); - return event; - } -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java index c2dbe50..b1118d5 100644 --- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java +++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java @@ -21,13 +21,20 @@ package org.apache.stratos.load.balancer.extension.api; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatsPublisher; +import org.apache.stratos.load.balancer.common.statistics.WSO2CEPStatsPublisher; import org.apache.stratos.load.balancer.common.topology.TopologyReceiver; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.event.Event; import org.apache.stratos.messaging.event.topology.*; import org.apache.stratos.messaging.message.processor.topology.TopologyEventProcessorChain; import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import java.util.HashMap; +import java.util.Map; + /** * Load balancer extension thread for executing load balancer life-cycle according to the topology updates * received from the message broker. @@ -36,18 +43,26 @@ public class LoadBalancerExtension implements Runnable { private static final Log log = LogFactory.getLog(LoadBalancerExtension.class); private LoadBalancer loadBalancer; + private LoadBalancerStatsReader statsReader; - public LoadBalancerExtension(LoadBalancer loadBalancer) { + public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatsReader statsReader) { this.loadBalancer = loadBalancer; + this.statsReader = statsReader; } @Override public void run() { try { - // Start topology receiver + // Start topology receiver thread TopologyReceiver topologyReceiver = new TopologyReceiver(createMessageDelegator()); - Thread thread = new Thread(topologyReceiver); - thread.start(); + Thread topologyReceiverThread = new Thread(topologyReceiver); + topologyReceiverThread.start(); + + // Start stats notifier thread + LoadBalancerStatsNotifier statsNotifier = new LoadBalancerStatsNotifier(statsReader); + Thread statsNotifierThread = new Thread(statsNotifier); + statsNotifierThread.start(); + } catch (Exception e) { if (log.isErrorEnabled()) { log.error(e); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java new file mode 100644 index 0000000..532ded3 --- /dev/null +++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.load.balancer.extension.api; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatsPublisher; +import org.apache.stratos.load.balancer.common.statistics.WSO2CEPStatsPublisher; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + +import java.util.HashMap; +import java.util.Map; + +/** + * + */ +public class LoadBalancerStatsNotifier implements Runnable { + private static final Log log = LogFactory.getLog(LoadBalancerStatsNotifier.class); + + private LoadBalancerStatsReader statsReader; + private final LoadBalancerStatsPublisher statsPublisher; + private long statsPublisherInterval = 15000; + + public LoadBalancerStatsNotifier(LoadBalancerStatsReader statsReader) { + this.statsReader = statsReader; + this.statsPublisher = new WSO2CEPStatsPublisher(); + + String interval = System.getProperty("stats.notifier.interval"); + if (interval != null) { + statsPublisherInterval = Long.getLong(interval); + } + } + + @Override + public void run() { + while (true) { + try { + try { + Thread.sleep(statsPublisherInterval); + } catch (InterruptedException ignore) { + } + Map<String, Integer> stats = new HashMap<String, Integer>(); + for (Service service : TopologyManager.getTopology().getServices()) { + for (Cluster cluster : service.getClusters()) { + stats.put(cluster.getClusterId(), statsReader.getInFlightRequestCount(cluster.getClusterId())); + } + } + if (stats.size() > 0) { + statsPublisher.publish(stats); + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Could not publish load balancer stats", e); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java new file mode 100644 index 0000000..2c6f324 --- /dev/null +++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.load.balancer.extension.api; + +import java.util.HashMap; + +/** + * Load balancer statistics reader interface. + */ +public interface LoadBalancerStatsReader { + + /** + * Get in-flight request count of a given cluster. + * @param clusterId + */ + int getInFlightRequestCount(String clusterId); +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java index 4a05335..e54fb60 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java @@ -24,7 +24,7 @@ import org.apache.axis2.description.TransportInDescription; import org.apache.http.protocol.HTTP; import org.apache.stratos.load.balancer.RequestDelegator; import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory; -import org.apache.stratos.load.balancer.common.statistics.LoadBalancingStatsCollector; +import org.apache.stratos.load.balancer.statistics.LoadBalancerStatsCollector; import org.apache.stratos.load.balancer.util.Constants; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.Port; @@ -410,7 +410,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints setupLoadBalancerContextProperties(synCtx); // Update health stats - LoadBalancingStatsCollector.getInstance().incrementRequestInflightCount(currentMember.getDomain()); + LoadBalancerStatsCollector.getInstance().incrementRequestInflightCount(currentMember.getDomain()); // Set the cluster id in the message context synCtx.setProperty(Constants.CLUSTER_ID, currentMember.getDomain()); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java index 9108be5..d75b460 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java @@ -18,7 +18,7 @@ */ package org.apache.stratos.load.balancer.mediators; -import org.apache.stratos.load.balancer.common.statistics.LoadBalancingStatsCollector; +import org.apache.stratos.load.balancer.statistics.LoadBalancerStatsCollector; import org.apache.stratos.load.balancer.util.Constants; import org.apache.synapse.ManagedLifecycle; import org.apache.synapse.MessageContext; @@ -36,7 +36,7 @@ public class ResponseInterceptor extends AbstractMediator implements ManagedLife log.debug("Mediation started " + ResponseInterceptor.class.getName()); } String clusterId = (String) synCtx.getProperty(Constants.CLUSTER_ID); - LoadBalancingStatsCollector.getInstance().decrementRequestInflightCount(clusterId); + LoadBalancerStatsCollector.getInstance().decrementRequestInflightCount(clusterId); return true; } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatsCollector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatsCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatsCollector.java new file mode 100644 index 0000000..895657f --- /dev/null +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatsCollector.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.load.balancer.statistics; + +import java.util.HashMap; +import java.util.Map; +import java.util.Observable; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.load.balancer.statistics.observers.WSO2CEPStatsObserver; + +/** + * This is the load balancing stats collector and any observer can get registered here + * and receive notifications periodically. + * This is a Singleton object. + * @author nirmal + * + */ +public class LoadBalancerStatsCollector extends Observable{ + private static final Log log = LogFactory.getLog(LoadBalancerStatsCollector.class); + + private static LoadBalancerStatsCollector collector; + private Map<String, Integer> clusterIdToRequestInflightCountMap; + private Thread notifier; + + private LoadBalancerStatsCollector() { + clusterIdToRequestInflightCountMap = new ConcurrentHashMap<String, Integer>(); + if (notifier == null || (notifier != null && !notifier.isAlive())) { + notifier = new Thread(new ObserverNotifier()); + notifier.start(); + } + } + + public static LoadBalancerStatsCollector getInstance() { + if (collector == null) { + synchronized (LoadBalancerStatsCollector.class) { + if (collector == null) { + collector = new LoadBalancerStatsCollector(); + // add observers + collector.addObserver(new WSO2CEPStatsObserver()); + } + } + } + return collector; + } + + public void setRequestInflightCount(String clusterId, int value) { + if(clusterId == null) { + return; + } + + clusterIdToRequestInflightCountMap.put(clusterId, value); + setChanged(); + } + + public void incrementRequestInflightCount(String clusterId) { + incrementRequestInflightCount(clusterId, 1); + } + + public void incrementRequestInflightCount(String clusterId, int value) { + if(clusterId == null) { + return; + } + + if(clusterIdToRequestInflightCountMap.get(clusterId) != null) { + value += clusterIdToRequestInflightCountMap.get(clusterId); + } + clusterIdToRequestInflightCountMap.put(clusterId, value); + setChanged(); + } + + public void decrementRequestInflightCount(String clusterId) { + decrementRequestInflightCount(clusterId , 1); + } + + public void decrementRequestInflightCount(String clusterId, int value) { + if(clusterId == null) { + return; + } + + if(clusterIdToRequestInflightCountMap.get(clusterId) != null) { + value += clusterIdToRequestInflightCountMap.get(clusterId); + } + clusterIdToRequestInflightCountMap.put(clusterId, value); + setChanged(); + } + + + /** + * This thread will notify all the observers of this subject. + * @author nirmal + * + */ + private class ObserverNotifier implements Runnable { + + @Override + public void run() { + if(log.isInfoEnabled()) { + log.info("Load balancing statistics notifier thread started"); + } + while(true) { + try { + Thread.sleep(15000); + } catch (InterruptedException ignore) { + } + LoadBalancerStatsCollector.getInstance().notifyObservers(new HashMap<String, Integer>(clusterIdToRequestInflightCountMap)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPStatsObserver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPStatsObserver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPStatsObserver.java new file mode 100644 index 0000000..b2e108a --- /dev/null +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPStatsObserver.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.load.balancer.statistics.observers; + +import java.util.Map; +import java.util.Observable; +import java.util.Observer; + +import org.apache.stratos.load.balancer.common.statistics.WSO2CEPStatsPublisher; + +public class WSO2CEPStatsObserver implements Observer{ + private WSO2CEPStatsPublisher statsPublisher; + + public WSO2CEPStatsObserver() { + this.statsPublisher = new WSO2CEPStatsPublisher(); + } + + public void update(Observable arg0, Object arg1) { + if(arg1 != null && arg1 instanceof Map<?, ?>) { + Map<String, Integer> stats = (Map<String, Integer>)arg1; + statsPublisher.publish(stats); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java new file mode 100644 index 0000000..d76cc0b --- /dev/null +++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.haproxy.extension; + +import org.apache.stratos.load.balancer.extension.api.LoadBalancerStatsReader; + +/** + * HAProxy statistics reader. + */ +public class HAProxyStatsReader implements LoadBalancerStatsReader { + + @Override + public int getInFlightRequestCount(String clusterId) { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java index 979f59b..ef586f5 100644 --- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java +++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java @@ -56,7 +56,8 @@ public class Main { } HAProxy haProxy = new HAProxy(executableFilePath, templatePath, templateName, confFilePath); - LoadBalancerExtension extension = new LoadBalancerExtension(haProxy); + HAProxyStatsReader statsReader = new HAProxyStatsReader(); + LoadBalancerExtension extension = new LoadBalancerExtension(haProxy, statsReader); Thread thread = new Thread(extension); thread.start(); }
