Updated Branches: refs/heads/master 2fca73b39 -> 9ced6550c
Refactored cep statistics publisher and added load balancer faulty member statistics publisher Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/9ced6550 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/9ced6550 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/9ced6550 Branch: refs/heads/master Commit: 9ced6550c30fd6de212400d4e1d231ba5d998e68 Parents: 2fca73b Author: Imesh Gunaratne <[email protected]> Authored: Thu Dec 5 16:56:21 2013 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Thu Dec 5 16:56:21 2013 +0530 ---------------------------------------------------------------------- .../statistics/LoadBalancerStatsPublisher.java | 6 +- .../WSO2CEPFaultyMemberPublisher.java | 74 +++++++++++ .../WSO2CEPInFlightRequestPublisher.java | 74 +++++++++++ .../statistics/WSO2CEPStatsPublisher.java | 80 +++++------- .../extension/api/LoadBalancerExtension.java | 4 +- ...oadBalancerInFlightRequestCountNotifier.java | 82 ++++++++++++ .../api/LoadBalancerStatsNotifier.java | 90 ------------- .../TenantAwareLoadBalanceEndpoint.java | 4 +- .../balancer/mediators/ResponseInterceptor.java | 4 +- ...adBalancerInFlightRequestCountCollector.java | 128 +++++++++++++++++++ .../statistics/LoadBalancerStatsCollector.java | 128 ------------------- .../WSO2CEPInFlightRequestCountObserver.java | 55 ++++++++ .../observers/WSO2CEPStatsObserver.java | 48 ------- 13 files changed, 454 insertions(+), 323 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java index 46f9b35..298087a 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java @@ -38,8 +38,8 @@ public interface LoadBalancerStatsPublisher { boolean isEnabled(); /** - * Publish statistics as a map of Cluster Id, In-flight Request Count. - * @param stats + * Payload to be published. + * @param payload An array of parameter values. */ - void publish(Map<String, Integer> stats); + void publish(Object[] payload); } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPFaultyMemberPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPFaultyMemberPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPFaultyMemberPublisher.java new file mode 100644 index 0000000..fc6b65a --- /dev/null +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPFaultyMemberPublisher.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.load.balancer.common.statistics; + +import org.wso2.carbon.databridge.commons.Attribute; +import org.wso2.carbon.databridge.commons.AttributeType; +import org.wso2.carbon.databridge.commons.StreamDefinition; + +import java.util.ArrayList; +import java.util.List; + +/** + * WSO2 CEP faulty member publisher. + * + * Faulty members: + * If a request was rejected by some of the members in a cluster while at least + * one member could serve it, those members could be identified as faulty. + */ +public class WSO2CEPFaultyMemberPublisher extends WSO2CEPStatsPublisher { + + private static final String DATA_STREAM_NAME = "stratos.lb.faulty.members"; + private static final String VERSION = "1.0.0"; + + private static StreamDefinition createStreamDefinition() { + try { + StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION); + streamDefinition.setNickName("lb fault members"); + streamDefinition.setDescription("lb fault members"); + List<Attribute> payloadData = new ArrayList<Attribute>(); + // Payload definition + payloadData.add(new Attribute("cluster_id", AttributeType.STRING)); + payloadData.add(new Attribute("member_id", AttributeType.STRING)); + streamDefinition.setPayloadData(payloadData); + return streamDefinition; + } + catch (Exception e) { + throw new RuntimeException("Could not create stream definition", e); + } + } + + public WSO2CEPFaultyMemberPublisher() { + super(createStreamDefinition()); + } + + /** + * Publish faulty members. + * @param clusterId + * @param memberId + */ + public void publish(String clusterId, String memberId) { + List<Object> payload = new ArrayList<Object>(); + // Payload values + payload.add(clusterId); + payload.add(memberId); + super.publish(payload.toArray()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java new file mode 100644 index 0000000..f10907e --- /dev/null +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.load.balancer.common.statistics; + +import org.wso2.carbon.databridge.commons.Attribute; +import org.wso2.carbon.databridge.commons.AttributeType; +import org.wso2.carbon.databridge.commons.StreamDefinition; + +import java.util.ArrayList; +import java.util.List; + +/** + * WSO2 CEP in flight request count publisher. + * + * In-flight request count: + * Number of requests being served at a given moment could be identified as + * in-flight request count. + */ +public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatsPublisher { + + private static final String DATA_STREAM_NAME = "stratos.lb.stats"; + private static final String VERSION = "1.0.0"; + + private static StreamDefinition createStreamDefinition() { + try { + StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION); + streamDefinition.setNickName("lb stats"); + streamDefinition.setDescription("lb stats"); + List<Attribute> payloadData = new ArrayList<Attribute>(); + // Payload definition + payloadData.add(new Attribute("cluster_id", AttributeType.STRING)); + payloadData.add(new Attribute("in_flight_requests", AttributeType.INT)); + streamDefinition.setPayloadData(payloadData); + return streamDefinition; + } + catch (Exception e) { + throw new RuntimeException("Could not create stream definition", e); + } + } + + public WSO2CEPInFlightRequestPublisher() { + super(createStreamDefinition()); + } + + /** + * Publish in-flight request count of a cluster. + * @param clusterId + * @param inFlightRequestCount + */ + public void publish(String clusterId, int inFlightRequestCount) { + List<Object> payload = new ArrayList<Object>(); + // Payload values + payload.add(clusterId); + payload.add(inFlightRequestCount); + super.publish(payload.toArray()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java index cb24a2a..6d013ed 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java @@ -27,10 +27,10 @@ import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher; import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration; import org.wso2.carbon.databridge.agent.thrift.exception.AgentException; import org.wso2.carbon.databridge.commons.Event; +import org.wso2.carbon.databridge.commons.StreamDefinition; import org.wso2.carbon.utils.CarbonUtils; import java.io.File; -import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -40,20 +40,26 @@ import java.util.Map; public class WSO2CEPStatsPublisher implements LoadBalancerStatsPublisher { private static final Log log = LogFactory.getLog(WSO2CEPStatsPublisher.class); - private static final String CALL_CENTER_DATA_STREAM = "stratos.lb.stats"; - private static final String VERSION = "1.0.0"; + private StreamDefinition streamDefinition; private AsyncDataPublisher asyncDataPublisher; private String ip; private String port; + private String username; + private String password; private boolean enabled = false; - public WSO2CEPStatsPublisher() { - ip = System.getProperty("thrift.receiver.ip"); - port = System.getProperty("thrift.receiver.port"); - enabled = Boolean.getBoolean("load.balancer.cep.stats.publisher.enabled"); - - if(enabled) { - init(); + public WSO2CEPStatsPublisher(StreamDefinition streamDefinition) { + this.streamDefinition = streamDefinition; + this.ip = System.getProperty("thrift.receiver.ip"); + this.port = System.getProperty("thrift.receiver.port"); + this.username = "admin"; + this.password = "admin"; + String enabledStr = System.getProperty("load.balancer.cep.stats.publisher.enabled"); + if (StringUtils.isNotBlank(enabledStr)) { + enabled = Boolean.getBoolean(enabledStr); + if (enabled) { + init(); + } } } @@ -62,25 +68,14 @@ public class WSO2CEPStatsPublisher implements LoadBalancerStatsPublisher { Agent agent = new Agent(agentConfiguration); // Initialize asynchronous data publisher - asyncDataPublisher = new AsyncDataPublisher("tcp://"+ip+":"+port+"", "admin", "admin", agent); - String streamDefinition = "{" + - " 'name':'" + CALL_CENTER_DATA_STREAM + "'," + - " 'version':'" + VERSION + "'," + - " 'nickName': 'lb stats'," + - " 'description': 'lb stats'," + - " 'metaData':[]," + - " 'payloadData':[" + - " {'name':'cluster_id','type':'STRING'}," + - " {'name':'in_flight_requests','type':'INT'}" + - " ]" + - "}"; - asyncDataPublisher.addStreamDefinition(streamDefinition, CALL_CENTER_DATA_STREAM, VERSION); + asyncDataPublisher = new AsyncDataPublisher("tcp://" + ip + ":" + port + "", username, password, agent); + asyncDataPublisher.addStreamDefinition(streamDefinition); } @Override public void setEnabled(boolean enabled) { this.enabled = enabled; - if(this.enabled) { + if (this.enabled) { init(); } } @@ -91,35 +86,24 @@ public class WSO2CEPStatsPublisher implements LoadBalancerStatsPublisher { } @Override - public void publish(Map<String, Integer> stats) { - if(!isEnabled()) { + public void publish(Object[] payload) { + if (!isEnabled()) { throw new RuntimeException("Statistics publisher is not enabled"); } - for (Map.Entry<String, Integer> entry : stats.entrySet()) { + Event event = new Event(); + event.setPayloadData(payload); + event.setArbitraryDataMap(new HashMap<String, String>()); - Object[] payload = new Object[]{entry.getKey(), entry.getValue()}; - Event event = eventObject(null, null, payload, new HashMap<String, String>()); - try { - if(log.isInfoEnabled()) { - log.info(String.format("Publishing statistics event: [stream] %s [version] %s [payload] %s", CALL_CENTER_DATA_STREAM, VERSION, Arrays.toString(payload))); - } - asyncDataPublisher.publish(CALL_CENTER_DATA_STREAM, VERSION, event); - } catch (AgentException e) { - log.error("Failed to publish events. ", e); + try { + if (log.isDebugEnabled()) { + log.debug(String.format("Publishing cep event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion())); + } + asyncDataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event); + } catch (AgentException e) { + if (log.isErrorEnabled()) { + log.error(String.format("Could not publish cep event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), e); } - } - stats = null; - } - - private static Event eventObject(Object[] correlationData, Object[] metaData, - Object[] payLoadData, HashMap<String, String> map) { - Event event = new Event(); - event.setCorrelationData(correlationData); - event.setMetaData(metaData); - event.setPayloadData(payLoadData); - event.setArbitraryDataMap(map); - return event; } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java index 95dc039..f9b607a 100644 --- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java +++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java @@ -39,7 +39,7 @@ public class LoadBalancerExtension implements Runnable { private LoadBalancerStatsReader statsReader; private boolean loadBalancerStarted; private TopologyReceiver topologyReceiver; - private LoadBalancerStatsNotifier statsNotifier; + private LoadBalancerInFlightRequestCountNotifier statsNotifier; private boolean terminated; public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatsReader statsReader) { @@ -60,7 +60,7 @@ public class LoadBalancerExtension implements Runnable { topologyReceiverThread.start(); // Start stats notifier thread - statsNotifier = new LoadBalancerStatsNotifier(statsReader); + statsNotifier = new LoadBalancerInFlightRequestCountNotifier(statsReader); Thread statsNotifierThread = new Thread(statsNotifier); statsNotifierThread.start(); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java new file mode 100644 index 0000000..57b969f --- /dev/null +++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.load.balancer.extension.api; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.load.balancer.common.statistics.WSO2CEPInFlightRequestPublisher; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + +/** + * Load balancer statistics notifier thread for publishing statistics periodically to CEP. + */ +public class LoadBalancerInFlightRequestCountNotifier implements Runnable { + private static final Log log = LogFactory.getLog(LoadBalancerInFlightRequestCountNotifier.class); + + private LoadBalancerStatsReader statsReader; + private final WSO2CEPInFlightRequestPublisher statsPublisher; + private long statsPublisherInterval = 15000; + private boolean terminated; + + public LoadBalancerInFlightRequestCountNotifier(LoadBalancerStatsReader statsReader) { + this.statsReader = statsReader; + this.statsPublisher = new WSO2CEPInFlightRequestPublisher(); + + String interval = System.getProperty("stats.notifier.interval"); + if (interval != null) { + statsPublisherInterval = Long.getLong(interval); + } + } + + @Override + public void run() { + while (!terminated) { + try { + try { + Thread.sleep(statsPublisherInterval); + } catch (InterruptedException ignore) { + } + + if (statsPublisher.isEnabled()) { + for (Service service : TopologyManager.getTopology().getServices()) { + for (Cluster cluster : service.getClusters()) { + statsPublisher.publish(cluster.getClusterId(), statsReader.getInFlightRequestCount(cluster.getClusterId())); + } + } + } else if (log.isWarnEnabled()) { + log.warn("CEP statistics publisher is disabled"); + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Could not publish in-flight request count", e); + } + } + } + } + + /** + * Terminate load balancer statistics notifier thread. + */ + public void terminate() { + terminated = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java deleted file mode 100644 index 47a545e..0000000 --- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.load.balancer.extension.api; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatsPublisher; -import org.apache.stratos.load.balancer.common.statistics.WSO2CEPStatsPublisher; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.Service; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; - -import java.util.HashMap; -import java.util.Map; - -/** - * Load balancer statistics notifier thread for publishing statistics periodically to CEP. - */ -public class LoadBalancerStatsNotifier implements Runnable { - private static final Log log = LogFactory.getLog(LoadBalancerStatsNotifier.class); - - private LoadBalancerStatsReader statsReader; - private final LoadBalancerStatsPublisher statsPublisher; - private long statsPublisherInterval = 15000; - private boolean terminated; - - public LoadBalancerStatsNotifier(LoadBalancerStatsReader statsReader) { - this.statsReader = statsReader; - this.statsPublisher = new WSO2CEPStatsPublisher(); - - String interval = System.getProperty("stats.notifier.interval"); - if (interval != null) { - statsPublisherInterval = Long.getLong(interval); - } - } - - @Override - public void run() { - while (!terminated) { - try { - try { - Thread.sleep(statsPublisherInterval); - } catch (InterruptedException ignore) { - } - Map<String, Integer> stats = new HashMap<String, Integer>(); - for (Service service : TopologyManager.getTopology().getServices()) { - for (Cluster cluster : service.getClusters()) { - stats.put(cluster.getClusterId(), statsReader.getInFlightRequestCount(cluster.getClusterId())); - } - } - if (stats.size() > 0) { - if(statsPublisher.isEnabled()) { - statsPublisher.publish(stats); - } - else if (log.isWarnEnabled()) { - log.warn("Load balancer statistics publisher is disabled"); - } - } - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error("Could not publish load balancer stats", e); - } - } - } - } - - /** - * Terminate load balancer statistics notifier thread. - */ - public void terminate() { - terminated = true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java index f516a52..7b8c816 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java @@ -24,7 +24,7 @@ import org.apache.axis2.description.TransportInDescription; import org.apache.http.protocol.HTTP; import org.apache.stratos.load.balancer.RequestDelegator; import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory; -import org.apache.stratos.load.balancer.statistics.LoadBalancerStatsCollector; +import org.apache.stratos.load.balancer.statistics.LoadBalancerInFlightRequestCountCollector; import org.apache.stratos.load.balancer.util.Constants; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.Port; @@ -411,7 +411,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints setupLoadBalancerContextProperties(synCtx); // Update health stats - LoadBalancerStatsCollector.getInstance().incrementRequestInflightCount(currentMember.getDomain()); + LoadBalancerInFlightRequestCountCollector.getInstance().incrementRequestInflightCount(currentMember.getDomain()); // Set the cluster id in the message context synCtx.setProperty(Constants.CLUSTER_ID, currentMember.getDomain()); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java index d75b460..a21a83b 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java @@ -18,7 +18,7 @@ */ package org.apache.stratos.load.balancer.mediators; -import org.apache.stratos.load.balancer.statistics.LoadBalancerStatsCollector; +import org.apache.stratos.load.balancer.statistics.LoadBalancerInFlightRequestCountCollector; import org.apache.stratos.load.balancer.util.Constants; import org.apache.synapse.ManagedLifecycle; import org.apache.synapse.MessageContext; @@ -36,7 +36,7 @@ public class ResponseInterceptor extends AbstractMediator implements ManagedLife log.debug("Mediation started " + ResponseInterceptor.class.getName()); } String clusterId = (String) synCtx.getProperty(Constants.CLUSTER_ID); - LoadBalancerStatsCollector.getInstance().decrementRequestInflightCount(clusterId); + LoadBalancerInFlightRequestCountCollector.getInstance().decrementRequestInflightCount(clusterId); return true; } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java new file mode 100644 index 0000000..0ac8e0c --- /dev/null +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.load.balancer.statistics; + +import java.util.HashMap; +import java.util.Map; +import java.util.Observable; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.load.balancer.statistics.observers.WSO2CEPInFlightRequestCountObserver; + +/** + * This is the load balancing stats collector and any observer can get registered here + * and receive notifications periodically. + * This is a Singleton object. + * @author nirmal + * + */ +public class LoadBalancerInFlightRequestCountCollector extends Observable{ + private static final Log log = LogFactory.getLog(LoadBalancerInFlightRequestCountCollector.class); + + private static LoadBalancerInFlightRequestCountCollector collector; + private Map<String, Integer> clusterIdToRequestInflightCountMap; + private Thread notifier; + + private LoadBalancerInFlightRequestCountCollector() { + clusterIdToRequestInflightCountMap = new ConcurrentHashMap<String, Integer>(); + if (notifier == null || (notifier != null && !notifier.isAlive())) { + notifier = new Thread(new ObserverNotifier()); + notifier.start(); + } + } + + public static LoadBalancerInFlightRequestCountCollector getInstance() { + if (collector == null) { + synchronized (LoadBalancerInFlightRequestCountCollector.class) { + if (collector == null) { + collector = new LoadBalancerInFlightRequestCountCollector(); + // add observers + collector.addObserver(new WSO2CEPInFlightRequestCountObserver()); + } + } + } + return collector; + } + + public void setRequestInflightCount(String clusterId, int value) { + if(clusterId == null) { + return; + } + + clusterIdToRequestInflightCountMap.put(clusterId, value); + setChanged(); + } + + public void incrementRequestInflightCount(String clusterId) { + incrementRequestInflightCount(clusterId, 1); + } + + public void incrementRequestInflightCount(String clusterId, int value) { + if(clusterId == null) { + return; + } + + if(clusterIdToRequestInflightCountMap.get(clusterId) != null) { + value += clusterIdToRequestInflightCountMap.get(clusterId); + } + clusterIdToRequestInflightCountMap.put(clusterId, value); + setChanged(); + } + + public void decrementRequestInflightCount(String clusterId) { + decrementRequestInflightCount(clusterId , 1); + } + + public void decrementRequestInflightCount(String clusterId, int value) { + if(clusterId == null) { + return; + } + + if(clusterIdToRequestInflightCountMap.get(clusterId) != null) { + value += clusterIdToRequestInflightCountMap.get(clusterId); + } + clusterIdToRequestInflightCountMap.put(clusterId, value); + setChanged(); + } + + + /** + * This thread will notify all the observers of this subject. + * @author nirmal + * + */ + private class ObserverNotifier implements Runnable { + + @Override + public void run() { + if(log.isInfoEnabled()) { + log.info("Load balancing statistics notifier thread started"); + } + while(true) { + try { + Thread.sleep(15000); + } catch (InterruptedException ignore) { + } + LoadBalancerInFlightRequestCountCollector.getInstance().notifyObservers(new HashMap<String, Integer>(clusterIdToRequestInflightCountMap)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatsCollector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatsCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatsCollector.java deleted file mode 100644 index 895657f..0000000 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatsCollector.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.load.balancer.statistics; - -import java.util.HashMap; -import java.util.Map; -import java.util.Observable; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.load.balancer.statistics.observers.WSO2CEPStatsObserver; - -/** - * This is the load balancing stats collector and any observer can get registered here - * and receive notifications periodically. - * This is a Singleton object. - * @author nirmal - * - */ -public class LoadBalancerStatsCollector extends Observable{ - private static final Log log = LogFactory.getLog(LoadBalancerStatsCollector.class); - - private static LoadBalancerStatsCollector collector; - private Map<String, Integer> clusterIdToRequestInflightCountMap; - private Thread notifier; - - private LoadBalancerStatsCollector() { - clusterIdToRequestInflightCountMap = new ConcurrentHashMap<String, Integer>(); - if (notifier == null || (notifier != null && !notifier.isAlive())) { - notifier = new Thread(new ObserverNotifier()); - notifier.start(); - } - } - - public static LoadBalancerStatsCollector getInstance() { - if (collector == null) { - synchronized (LoadBalancerStatsCollector.class) { - if (collector == null) { - collector = new LoadBalancerStatsCollector(); - // add observers - collector.addObserver(new WSO2CEPStatsObserver()); - } - } - } - return collector; - } - - public void setRequestInflightCount(String clusterId, int value) { - if(clusterId == null) { - return; - } - - clusterIdToRequestInflightCountMap.put(clusterId, value); - setChanged(); - } - - public void incrementRequestInflightCount(String clusterId) { - incrementRequestInflightCount(clusterId, 1); - } - - public void incrementRequestInflightCount(String clusterId, int value) { - if(clusterId == null) { - return; - } - - if(clusterIdToRequestInflightCountMap.get(clusterId) != null) { - value += clusterIdToRequestInflightCountMap.get(clusterId); - } - clusterIdToRequestInflightCountMap.put(clusterId, value); - setChanged(); - } - - public void decrementRequestInflightCount(String clusterId) { - decrementRequestInflightCount(clusterId , 1); - } - - public void decrementRequestInflightCount(String clusterId, int value) { - if(clusterId == null) { - return; - } - - if(clusterIdToRequestInflightCountMap.get(clusterId) != null) { - value += clusterIdToRequestInflightCountMap.get(clusterId); - } - clusterIdToRequestInflightCountMap.put(clusterId, value); - setChanged(); - } - - - /** - * This thread will notify all the observers of this subject. - * @author nirmal - * - */ - private class ObserverNotifier implements Runnable { - - @Override - public void run() { - if(log.isInfoEnabled()) { - log.info("Load balancing statistics notifier thread started"); - } - while(true) { - try { - Thread.sleep(15000); - } catch (InterruptedException ignore) { - } - LoadBalancerStatsCollector.getInstance().notifyObservers(new HashMap<String, Integer>(clusterIdToRequestInflightCountMap)); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java new file mode 100644 index 0000000..9a77778 --- /dev/null +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.load.balancer.statistics.observers; + +import java.util.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.load.balancer.common.statistics.WSO2CEPInFlightRequestPublisher; + +public class WSO2CEPInFlightRequestCountObserver implements Observer { + private static final Log log = LogFactory.getLog(WSO2CEPInFlightRequestCountObserver.class); + private WSO2CEPInFlightRequestPublisher publisher; + + public WSO2CEPInFlightRequestCountObserver() { + this.publisher = new WSO2CEPInFlightRequestPublisher(); + } + + public void update(Observable observable, Object object) { + if (object != null && object instanceof Map<?, ?>) { + try { + if (publisher.isEnabled()) { + Map<String, Integer> stats = (Map<String, Integer>) object; + // Publish event per cluster id + for (String clusterId : stats.keySet()) { + // Publish event + publisher.publish(clusterId, stats.get(clusterId)); + } + } else if (log.isWarnEnabled()) { + log.warn("CEP statistics publisher is disabled"); + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Could not publish in-flight request count", e); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPStatsObserver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPStatsObserver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPStatsObserver.java deleted file mode 100644 index ebc793b..0000000 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPStatsObserver.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.load.balancer.statistics.observers; - -import java.util.Map; -import java.util.Observable; -import java.util.Observer; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.load.balancer.common.statistics.WSO2CEPStatsPublisher; - -public class WSO2CEPStatsObserver implements Observer{ - private static final Log log = LogFactory.getLog(WSO2CEPStatsObserver.class); - private WSO2CEPStatsPublisher statsPublisher; - - public WSO2CEPStatsObserver() { - this.statsPublisher = new WSO2CEPStatsPublisher(); - } - - public void update(Observable arg0, Object arg1) { - if(arg1 != null && arg1 instanceof Map<?, ?>) { - Map<String, Integer> stats = (Map<String, Integer>)arg1; - if(statsPublisher.isEnabled()) { - statsPublisher.publish(stats); - } - else if (log.isWarnEnabled()) { - log.warn("Load balancer statistics publisher is disabled"); - } - } - } -}
