Updated Branches: refs/heads/master 5f9188065 -> 6db5968e4
Fixed load balancer statistics publisher issue of not publishing statistics periodically when there are no incoming requests found Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/6db5968e Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/6db5968e Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/6db5968e Branch: refs/heads/master Commit: 6db5968e4d0c9c58f3ebd7f03261f61da48f3b86 Parents: 5f91880 Author: Imesh Gunaratne <[email protected]> Authored: Fri Dec 27 15:32:38 2013 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Fri Dec 27 15:32:38 2013 +0530 ---------------------------------------------------------------------- .../LoadBalancerStatisticsReader.java | 34 +++++ .../LoadBalancerStatisticsNotifier.java | 109 +++++++++++++++ .../extension/api/LoadBalancerExtension.java | 14 +- ...oadBalancerInFlightRequestCountNotifier.java | 101 -------------- .../extension/api/LoadBalancerStatsReader.java | 34 ----- .../TenantAwareLoadBalanceEndpoint.java | 4 +- .../internal/LoadBalancerServiceComponent.java | 19 +++ .../balancer/mediators/ResponseInterceptor.java | 4 +- ...adBalancerInFlightRequestCountCollector.java | 136 ------------------- .../LoadBalancerStatisticsCollector.java | 101 ++++++++++++++ .../WSO2CEPInFlightRequestCountObserver.java | 66 --------- .../extension/HAProxyStatisticsReader.java | 96 +++++++++++++ .../haproxy/extension/HAProxyStatsReader.java | 96 ------------- .../apache/stratos/haproxy/extension/Main.java | 2 +- 14 files changed, 372 insertions(+), 444 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java new file mode 100644 index 0000000..41e81e8 --- /dev/null +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.load.balancer.common.statistics; + +import java.util.HashMap; + +/** + * Load balancer statistics reader interface. + */ +public interface LoadBalancerStatisticsReader { + + /** + * Get in-flight request count of a given cluster. + * @param clusterId + */ + int getInFlightRequestCount(String clusterId); +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java new file mode 100644 index 0000000..88cfd4d --- /dev/null +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.load.balancer.common.statistics.notifier; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader; +import org.apache.stratos.load.balancer.common.statistics.publisher.WSO2CEPInFlightRequestPublisher; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + +/** + * Load balancer statistics notifier thread for publishing statistics periodically to CEP. + */ +public class LoadBalancerStatisticsNotifier implements Runnable { + private static final Log log = LogFactory.getLog(LoadBalancerStatisticsNotifier.class); + + private final LoadBalancerStatisticsReader statsReader; + private final WSO2CEPInFlightRequestPublisher inFlightRequestPublisher; + private long statsPublisherInterval = 15000; + private String networkPartitionId; + private boolean terminated; + + public LoadBalancerStatisticsNotifier(LoadBalancerStatisticsReader statsReader) { + this.statsReader = statsReader; + this.inFlightRequestPublisher = new WSO2CEPInFlightRequestPublisher(); + + String interval = System.getProperty("stats.notifier.interval"); + if (interval != null) { + statsPublisherInterval = Long.getLong(interval); + } + if(log.isDebugEnabled()) { + log.debug(String.format("stats.notifier.interval: %dms", statsPublisherInterval)); + } + + networkPartitionId = System.getProperty("network.partition.id"); + if (StringUtils.isBlank(networkPartitionId)) { + throw new RuntimeException("network.partition.id system property was not found."); + } + } + + @Override + public void run() { + while (!terminated) { + try { + try { + Thread.sleep(statsPublisherInterval); + } catch (InterruptedException ignore) { + } + + if (log.isDebugEnabled()) { + log.debug("Publishing load balancer statistics"); + } + if (inFlightRequestPublisher.isEnabled()) { + try { + TopologyManager.acquireReadLock(); + int requestCount; + for (Service service : TopologyManager.getTopology().getServices()) { + for (Cluster cluster : service.getClusters()) { + // Publish in-flight request count of load balancer's network partition + requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId()); + inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId, requestCount); + if (log.isDebugEnabled()) { + log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d", + cluster.getClusterId(), networkPartitionId, requestCount)); + } + } + + } + } finally { + TopologyManager.releaseReadLock(); + } + } else if (log.isWarnEnabled()) { + log.warn("In-flight request count publisher is disabled"); + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Could not publish load balancer statistics", e); + } + } + } + } + + /** + * Terminate load balancer statistics notifier thread. + */ + public void terminate() { + terminated = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java index 0e602c7..9fe5107 100644 --- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java +++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java @@ -21,6 +21,8 @@ package org.apache.stratos.load.balancer.extension.api; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader; +import org.apache.stratos.load.balancer.common.statistics.notifier.LoadBalancerStatisticsNotifier; import org.apache.stratos.messaging.event.Event; import org.apache.stratos.messaging.listener.topology.*; import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain; @@ -36,13 +38,13 @@ public class LoadBalancerExtension implements Runnable { private static final Log log = LogFactory.getLog(LoadBalancerExtension.class); private LoadBalancer loadBalancer; - private LoadBalancerStatsReader statsReader; + private LoadBalancerStatisticsReader statsReader; private boolean loadBalancerStarted; private TopologyReceiver topologyReceiver; - private LoadBalancerInFlightRequestCountNotifier inFlightRequestCountNotifier; + private LoadBalancerStatisticsNotifier statisticsNotifier; private boolean terminated; - public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatsReader statsReader) { + public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatisticsReader statsReader) { this.loadBalancer = loadBalancer; this.statsReader = statsReader; } @@ -60,8 +62,8 @@ public class LoadBalancerExtension implements Runnable { topologyReceiverThread.start(); // Start stats notifier thread - inFlightRequestCountNotifier = new LoadBalancerInFlightRequestCountNotifier(statsReader); - Thread statsNotifierThread = new Thread(inFlightRequestCountNotifier); + statisticsNotifier = new LoadBalancerStatisticsNotifier(statsReader); + Thread statsNotifierThread = new Thread(statisticsNotifier); statsNotifierThread.start(); // Keep the thread live until terminated @@ -146,7 +148,7 @@ public class LoadBalancerExtension implements Runnable { public void terminate() { topologyReceiver.terminate(); - inFlightRequestCountNotifier.terminate(); + statisticsNotifier.terminate(); terminated = true; } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java deleted file mode 100644 index c5bf52b..0000000 --- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.load.balancer.extension.api; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.load.balancer.common.statistics.publisher.WSO2CEPInFlightRequestPublisher; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.Service; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; - -/** - * Load balancer statistics notifier thread for publishing statistics periodically to CEP. - */ -public class LoadBalancerInFlightRequestCountNotifier implements Runnable { - private static final Log log = LogFactory.getLog(LoadBalancerInFlightRequestCountNotifier.class); - - private LoadBalancerStatsReader statsReader; - private final WSO2CEPInFlightRequestPublisher statsPublisher; - private long statsPublisherInterval = 15000; - private String networkPartitionId; - private boolean terminated; - - public LoadBalancerInFlightRequestCountNotifier(LoadBalancerStatsReader statsReader) { - this.statsReader = statsReader; - this.statsPublisher = new WSO2CEPInFlightRequestPublisher(); - - String interval = System.getProperty("stats.notifier.interval"); - if (interval != null) { - statsPublisherInterval = Long.getLong(interval); - } - networkPartitionId = System.getProperty("network.partition.id"); - if (StringUtils.isBlank(networkPartitionId)) { - throw new RuntimeException("network.partition.id system property was not found."); - } - } - - @Override - public void run() { - while (!terminated) { - try { - try { - Thread.sleep(statsPublisherInterval); - } catch (InterruptedException ignore) { - } - - if (statsPublisher.isEnabled()) { - try { - TopologyManager.acquireReadLock(); - int requestCount; - for (Service service : TopologyManager.getTopology().getServices()) { - for (Cluster cluster : service.getClusters()) { - // Publish in-flight request count of load balancer's network partition - requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId()); - statsPublisher.publish(cluster.getClusterId(), networkPartitionId, requestCount); - if (log.isDebugEnabled()) { - log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d", - cluster.getClusterId(), networkPartitionId, requestCount)); - } - } - - } - } finally { - TopologyManager.releaseReadLock(); - } - } else if (log.isWarnEnabled()) { - log.warn("CEP statistics publisher is disabled"); - } - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error("Could not publish in-flight request count", e); - } - } - } - } - - /** - * Terminate load balancer statistics notifier thread. - */ - public void terminate() { - terminated = true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java deleted file mode 100644 index 2c6f324..0000000 --- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.load.balancer.extension.api; - -import java.util.HashMap; - -/** - * Load balancer statistics reader interface. - */ -public interface LoadBalancerStatsReader { - - /** - * Get in-flight request count of a given cluster. - * @param clusterId - */ - int getInFlightRequestCount(String clusterId); -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java index b6fe08b..efc88c9 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java @@ -27,7 +27,7 @@ import org.apache.stratos.load.balancer.RequestDelegator; import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory; import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration; import org.apache.stratos.load.balancer.conf.domain.TenantIdentifier; -import org.apache.stratos.load.balancer.statistics.LoadBalancerInFlightRequestCountCollector; +import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsCollector; import org.apache.stratos.load.balancer.util.Constants; import org.apache.stratos.messaging.domain.tenant.Tenant; import org.apache.stratos.messaging.domain.topology.Member; @@ -526,7 +526,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints if(StringUtils.isBlank(clusterId)) { throw new RuntimeException("Cluster id not found in message context"); } - LoadBalancerInFlightRequestCountCollector.getInstance().incrementInFlightRequestCount(clusterId); + LoadBalancerStatisticsCollector.getInstance().incrementInFlightRequestCount(clusterId); } catch (Exception e) { if(log.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java index 2d11458..da7f3de 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java @@ -27,12 +27,15 @@ import org.apache.stratos.load.balancer.EndpointDeployer; import org.apache.stratos.load.balancer.LoadBalancerTenantReceiver; import org.apache.stratos.load.balancer.LoadBalancerTopologyReceiver; import org.apache.stratos.load.balancer.TenantAwareLoadBalanceEndpointException; +import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader; +import org.apache.stratos.load.balancer.common.statistics.notifier.LoadBalancerStatisticsNotifier; import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration; import org.apache.stratos.load.balancer.conf.configurator.CEPConfigurator; import org.apache.stratos.load.balancer.conf.configurator.JndiConfigurator; import org.apache.stratos.load.balancer.conf.configurator.SynapseConfigurator; import org.apache.stratos.load.balancer.conf.configurator.TopologyFilterConfigurator; import org.apache.stratos.load.balancer.context.LoadBalancerContext; +import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsCollector; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; @@ -100,6 +103,7 @@ public class LoadBalancerServiceComponent { private boolean activated = false; private LoadBalancerTopologyReceiver topologyReceiver; private LoadBalancerTenantReceiver tenantReceiver; + private LoadBalancerStatisticsNotifier statisticsNotifier; protected void activate(ComponentContext ctxt) { try { @@ -179,6 +183,19 @@ public class LoadBalancerServiceComponent { } } + if(configuration.isCepStatsPublisherEnabled()) { + // Get stats reader + LoadBalancerStatisticsReader statsReader = LoadBalancerStatisticsCollector.getInstance(); + + // Start stats notifier thread + statisticsNotifier = new LoadBalancerStatisticsNotifier(statsReader); + Thread statsNotifierThread = new Thread(statisticsNotifier); + statsNotifierThread.start(); + if (log.isInfoEnabled()) { + log.info("Load balancer statistics notifier thread started"); + } + } + activated = true; if (log.isInfoEnabled()) { log.info("Load balancer service component is activated "); @@ -206,6 +223,8 @@ public class LoadBalancerServiceComponent { tenantReceiver.terminate(); // Terminate topology receiver topologyReceiver.terminate(); + // Terminate statistics notifier + statisticsNotifier.terminate(); } /** http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java index fef3fea..cf3e768 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java @@ -19,7 +19,7 @@ package org.apache.stratos.load.balancer.mediators; import org.apache.commons.lang3.StringUtils; -import org.apache.stratos.load.balancer.statistics.LoadBalancerInFlightRequestCountCollector; +import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsCollector; import org.apache.stratos.load.balancer.util.Constants; import org.apache.synapse.ManagedLifecycle; import org.apache.synapse.MessageContext; @@ -40,7 +40,7 @@ public class ResponseInterceptor extends AbstractMediator implements ManagedLife if (StringUtils.isBlank(clusterId)) { throw new RuntimeException("Cluster id not found in message context"); } - LoadBalancerInFlightRequestCountCollector.getInstance().decrementInFlightRequestCount(clusterId); + LoadBalancerStatisticsCollector.getInstance().decrementInFlightRequestCount(clusterId); } catch (Exception e) { if(log.isErrorEnabled()) { log.error("Could not decrement in-flight request count", e); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java deleted file mode 100644 index 3c8927a..0000000 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.load.balancer.statistics; - -import org.apache.commons.lang.StringUtils; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.load.balancer.statistics.observers.WSO2CEPInFlightRequestCountObserver; - -import java.util.HashMap; -import java.util.Map; -import java.util.Observable; -import java.util.concurrent.ConcurrentHashMap; - -/** - * This is the load balancing in-flight request count collector and any observer can get registered here - * and receive notifications periodically. - * This is a Singleton object. - * - * @author nirmal - */ -public class LoadBalancerInFlightRequestCountCollector extends Observable { - private static final Log log = LogFactory.getLog(LoadBalancerInFlightRequestCountCollector.class); - - private static LoadBalancerInFlightRequestCountCollector collector; - // Map<ClusterId, Map<PartitionId, InFlightRequestCount> - private Map<String, Integer> inFlightRequestCountMap; - private Thread notifier; - - private LoadBalancerInFlightRequestCountCollector() { - inFlightRequestCountMap = new ConcurrentHashMap<String, Integer>(); - if (notifier == null || (notifier != null && !notifier.isAlive())) { - notifier = new Thread(new ObserverNotifier()); - notifier.start(); - } - } - - public static LoadBalancerInFlightRequestCountCollector getInstance() { - if (collector == null) { - synchronized (LoadBalancerInFlightRequestCountCollector.class) { - if (collector == null) { - collector = new LoadBalancerInFlightRequestCountCollector(); - // add observers - collector.addObserver(new WSO2CEPInFlightRequestCountObserver()); - } - } - } - return collector; - } - - public int getInFlightRequestCount(String clusterId) { - if (inFlightRequestCountMap.containsKey(clusterId)) { - return inFlightRequestCountMap.get(clusterId); - } - return 0; - } - - public void setInFlightRequestCount(String clusterId, int value) { - if (StringUtils.isBlank(clusterId)) { - return; - } - - inFlightRequestCountMap.put(clusterId, value); - if(log.isDebugEnabled()) { - log.debug(String.format("In-flight request count updated: [cluster] %s [value] %d", clusterId, value)); - } - setChanged(); - } - - public void incrementInFlightRequestCount(String clusterId) { - incrementInFlightRequestCount(clusterId, 1); - } - - public void incrementInFlightRequestCount(String clusterId, int value) { - if (StringUtils.isBlank(clusterId)) { - return; - } - - int count = getInFlightRequestCount(clusterId); - setInFlightRequestCount(clusterId, (count + value)); - } - - public void decrementInFlightRequestCount(String clusterId) { - decrementInFlightRequestCount(clusterId, 1); - } - - public void decrementInFlightRequestCount(String clusterId, int value) { - if (StringUtils.isBlank(clusterId)) { - return; - } - - int count = getInFlightRequestCount(clusterId); - int newValue = (count - value) < 0 ? 0 : (count - value); - setInFlightRequestCount(clusterId, newValue); - } - - - /** - * This thread will notify all the observers of this subject. - * - * @author nirmal - */ - private class ObserverNotifier implements Runnable { - - @Override - public void run() { - if (log.isInfoEnabled()) { - log.info("Load balancing statistics notifier thread started"); - } - while (true) { - try { - Thread.sleep(15000); - } catch (InterruptedException ignore) { - } - LoadBalancerInFlightRequestCountCollector.getInstance().notifyObservers(new HashMap<String, Integer>(inFlightRequestCountMap)); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java new file mode 100644 index 0000000..72186fe --- /dev/null +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.load.balancer.statistics; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This is the load balancer statistics collector. + */ +public class LoadBalancerStatisticsCollector implements LoadBalancerStatisticsReader { + private static final Log log = LogFactory.getLog(LoadBalancerStatisticsCollector.class); + + private static volatile LoadBalancerStatisticsCollector instance; + // Map<ClusterId, Map<PartitionId, InFlightRequestCount> + private Map<String, Integer> inFlightRequestCountMap; + + private LoadBalancerStatisticsCollector() { + inFlightRequestCountMap = new ConcurrentHashMap<String, Integer>(); + } + + public static LoadBalancerStatisticsCollector getInstance() { + if (instance == null) { + synchronized (LoadBalancerStatisticsCollector.class) { + if (instance == null) { + if(log.isDebugEnabled()) { + log.debug("Load balancer in-flight request count collector instance created"); + } + instance = new LoadBalancerStatisticsCollector(); + } + } + } + return instance; + } + + public int getInFlightRequestCount(String clusterId) { + if (inFlightRequestCountMap.containsKey(clusterId)) { + return inFlightRequestCountMap.get(clusterId); + } + return 0; + } + + public void setInFlightRequestCount(String clusterId, int value) { + if (StringUtils.isBlank(clusterId)) { + return; + } + + inFlightRequestCountMap.put(clusterId, value); + if (log.isDebugEnabled()) { + log.debug(String.format("In-flight request count updated: [cluster] %s [value] %d", clusterId, value)); + } + } + + public void incrementInFlightRequestCount(String clusterId) { + incrementInFlightRequestCount(clusterId, 1); + } + + public void incrementInFlightRequestCount(String clusterId, int value) { + if (StringUtils.isBlank(clusterId)) { + return; + } + + int count = getInFlightRequestCount(clusterId); + setInFlightRequestCount(clusterId, (count + value)); + } + + public void decrementInFlightRequestCount(String clusterId) { + decrementInFlightRequestCount(clusterId, 1); + } + + public void decrementInFlightRequestCount(String clusterId, int value) { + if (StringUtils.isBlank(clusterId)) { + return; + } + + int count = getInFlightRequestCount(clusterId); + int newValue = (count - value) < 0 ? 0 : (count - value); + setInFlightRequestCount(clusterId, newValue); + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java deleted file mode 100644 index 47a2602..0000000 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.load.balancer.statistics.observers; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.load.balancer.common.statistics.publisher.WSO2CEPInFlightRequestPublisher; - -import java.util.Map; -import java.util.Observable; -import java.util.Observer; - -public class WSO2CEPInFlightRequestCountObserver implements Observer { - private static final Log log = LogFactory.getLog(WSO2CEPInFlightRequestCountObserver.class); - private WSO2CEPInFlightRequestPublisher publisher; - private String networkPartitionId; - - public WSO2CEPInFlightRequestCountObserver() { - this.publisher = new WSO2CEPInFlightRequestPublisher(); - networkPartitionId = System.getProperty("network.partition.id"); - if (StringUtils.isBlank(networkPartitionId)) { - throw new RuntimeException("network.partition.id system property was not found."); - } - } - - public void update(Observable observable, Object object) { - try { - if (publisher.isEnabled()) { - // Map<ClusterId, Count> - Map<String, Integer> inFlightRequestCountMap = (Map<String, Integer>) object; - // Publish event per cluster id - for (String clusterId : inFlightRequestCountMap.keySet()) { - // Publish event - publisher.publish(clusterId, networkPartitionId, inFlightRequestCountMap.get(clusterId)); - if (log.isDebugEnabled()) { - log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d", - clusterId, networkPartitionId, inFlightRequestCountMap.get(clusterId))); - } - } - } else if (log.isWarnEnabled()) { - log.warn("CEP statistics publisher is disabled"); - } - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error("Could not publish in-flight request count to cep", e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java new file mode 100644 index 0000000..24fc423 --- /dev/null +++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.haproxy.extension; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Member; +import org.apache.stratos.messaging.domain.topology.Port; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + +import java.io.IOException; + +/** + * HAProxy statistics reader. + */ +public class HAProxyStatisticsReader implements LoadBalancerStatisticsReader { + private static final Log log = LogFactory.getLog(HAProxyStatisticsReader.class); + + private String scriptsPath; + private String statsSocketFilePath; + + public HAProxyStatisticsReader() { + this.scriptsPath = HAProxyContext.getInstance().getScriptsPath(); + this.statsSocketFilePath = HAProxyContext.getInstance().getStatsSocketFilePath(); + } + + @Override + public int getInFlightRequestCount(String clusterId) { + String frontendId, backendId, command, output; + String[] array; + int totalWeight, weight; + + for (Service service : TopologyManager.getTopology().getServices()) { + for (Cluster cluster : service.getClusters()) { + if (cluster.getClusterId().equals(clusterId)) { + totalWeight = 0; + if ((service.getPorts() == null) || (service.getPorts().size() == 0)) { + throw new RuntimeException(String.format("No ports found in service: %s", service.getServiceName())); + } + + for (Port port : service.getPorts()) { + frontendId = cluster.getClusterId() + "-proxy-" + port.getProxy(); + backendId = frontendId + "-members"; + + for (Member member : cluster.getMembers()) { + // echo "get weight <backend>/<server>" | socat stdio <stats-socket> + command = String.format("%s/get-weight.sh %s %s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath); + try { + output = CommandUtil.executeCommand(command); + if ((output != null) && (output.length() > 0)) { + array = output.split(" "); + if ((array != null) && (array.length > 0)) { + weight = Integer.parseInt(array[0]); + if (log.isDebugEnabled()) { + log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] %d", member.getClusterId(), member.getMemberId(), weight)); + } + totalWeight += weight; + } + } + } catch (IOException e) { + if (log.isErrorEnabled()) { + log.error(e); + } + } + } + } + if (log.isInfoEnabled()) { + log.info(String.format("Cluster weight found: [cluster] %s [weight] %d", cluster.getClusterId(), totalWeight)); + } + return totalWeight; + } + } + } + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java deleted file mode 100644 index 57c6bc0..0000000 --- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.haproxy.extension; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.load.balancer.extension.api.LoadBalancerStatsReader; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.Member; -import org.apache.stratos.messaging.domain.topology.Port; -import org.apache.stratos.messaging.domain.topology.Service; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; - -import java.io.IOException; - -/** - * HAProxy statistics reader. - */ -public class HAProxyStatsReader implements LoadBalancerStatsReader { - private static final Log log = LogFactory.getLog(HAProxyStatsReader.class); - - private String scriptsPath; - private String statsSocketFilePath; - - public HAProxyStatsReader() { - this.scriptsPath = HAProxyContext.getInstance().getScriptsPath(); - this.statsSocketFilePath = HAProxyContext.getInstance().getStatsSocketFilePath(); - } - - @Override - public int getInFlightRequestCount(String clusterId) { - String frontendId, backendId, command, output; - String[] array; - int totalWeight, weight; - - for (Service service : TopologyManager.getTopology().getServices()) { - for (Cluster cluster : service.getClusters()) { - if (cluster.getClusterId().equals(clusterId)) { - totalWeight = 0; - if ((service.getPorts() == null) || (service.getPorts().size() == 0)) { - throw new RuntimeException(String.format("No ports found in service: %s", service.getServiceName())); - } - - for (Port port : service.getPorts()) { - frontendId = cluster.getClusterId() + "-proxy-" + port.getProxy(); - backendId = frontendId + "-members"; - - for (Member member : cluster.getMembers()) { - // echo "get weight <backend>/<server>" | socat stdio <stats-socket> - command = String.format("%s/get-weight.sh %s %s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath); - try { - output = CommandUtil.executeCommand(command); - if ((output != null) && (output.length() > 0)) { - array = output.split(" "); - if ((array != null) && (array.length > 0)) { - weight = Integer.parseInt(array[0]); - if (log.isDebugEnabled()) { - log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] %d", member.getClusterId(), member.getMemberId(), weight)); - } - totalWeight += weight; - } - } - } catch (IOException e) { - if (log.isErrorEnabled()) { - log.error(e); - } - } - } - } - if (log.isInfoEnabled()) { - log.info(String.format("Cluster weight found: [cluster] %s [weight] %d", cluster.getClusterId(), totalWeight)); - } - return totalWeight; - } - } - } - return 0; - } -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java index 176da20..d2a8731 100644 --- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java +++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java @@ -43,7 +43,7 @@ public class Main { // Validate runtime parameters HAProxyContext.getInstance().validate(); - extension = new LoadBalancerExtension(new HAProxy(), new HAProxyStatsReader()); + extension = new LoadBalancerExtension(new HAProxy(), new HAProxyStatisticsReader()); Thread thread = new Thread(extension); thread.start(); } catch (Exception e) {
