Updated Branches: refs/heads/master a189459ce -> aa539dbdf
Moving the AutoscalerTopologyReceiver.java to correct package Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/aa539dbd Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/aa539dbd Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/aa539dbd Branch: refs/heads/master Commit: aa539dbdfd726fbe97e505e1135c8e3bdbe0ceeb Parents: a189459 Author: Lahiru Sandaruwan <[email protected]> Authored: Mon Dec 23 15:06:22 2013 +0530 Committer: Lahiru Sandaruwan <[email protected]> Committed: Mon Dec 23 15:06:22 2013 +0530 ---------------------------------------------------------------------- .../topology/AutoscalerTopologyReceiver.java | 401 +++++++++++++++++++ .../topology/AutoscalerTopologyReceiver.java | 401 ------------------- 2 files changed, 401 insertions(+), 401 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/aa539dbd/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java new file mode 100644 index 0000000..b216df1 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.autoscaler.topology; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.AutoscalerContext; +import org.apache.stratos.autoscaler.MemberStatsContext; +import org.apache.stratos.autoscaler.NetworkPartitionContext; +import org.apache.stratos.autoscaler.PartitionContext; +import org.apache.stratos.autoscaler.exception.PartitionValidationException; +import org.apache.stratos.autoscaler.exception.PolicyValidationException; +import org.apache.stratos.autoscaler.monitor.AbstractMonitor; +import org.apache.stratos.autoscaler.monitor.ClusterMonitor; +import org.apache.stratos.autoscaler.monitor.LbClusterMonitor; +import org.apache.stratos.autoscaler.util.AutoscalerUtil; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent; +import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; +import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; +import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; +import org.apache.stratos.messaging.listener.topology.*; +import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain; +import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import org.apache.stratos.messaging.message.receiver.topology.TopologyReceiver; + +import java.util.Collection; + +/** + * Load balancer topology receiver. + */ +public class AutoscalerTopologyReceiver implements Runnable { + + private static final Log log = LogFactory.getLog(AutoscalerTopologyReceiver.class); + + private TopologyReceiver topologyReceiver; + private boolean terminated; + + public AutoscalerTopologyReceiver() { + this.topologyReceiver = new TopologyReceiver(createMessageDelegator()); + } + + @Override + public void run() { + //FIXME this activated before autoscaler deployer activated. + try { + Thread.sleep(15000); + } catch (InterruptedException ignore) { + } + Thread thread = new Thread(topologyReceiver); + thread.start(); + if(log.isInfoEnabled()) { + log.info("Autoscaler topology receiver thread started"); + } + + // Keep the thread live until terminated + while (!terminated); + if(log.isInfoEnabled()) { + log.info("Autoscaler topology receiver thread terminated"); + } + } + + private TopologyEventMessageDelegator createMessageDelegator() { + TopologyMessageProcessorChain processorChain = createEventProcessorChain(); + processorChain.addEventListener(new CompleteTopologyEventListener() { + @Override + protected void onEvent(Event event) { + + try { + TopologyManager.acquireReadLock(); + for(Service service : TopologyManager.getTopology().getServices()) { + for(Cluster cluster : service.getClusters()) { + Thread th; + if(cluster.isLbCluster()){ + th = new Thread(new LBClusterMonitorAdder(cluster)); + }else{ + th = new Thread(new ClusterMonitorAdder(cluster)); + } + + th.start(); + } + } + } + finally { + TopologyManager.releaseReadLock(); + } + } + + }); + return new TopologyEventMessageDelegator(processorChain); + } + + private TopologyMessageProcessorChain createEventProcessorChain() { + // Listen to topology events that affect clusters + TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain(); + processorChain.addEventListener(new ClusterCreatedEventListener() { + @Override + protected void onEvent(Event event) { + try { + ClusterCreatedEvent e = (ClusterCreatedEvent) event; + TopologyManager.acquireReadLock(); + Service service = TopologyManager.getTopology().getService(e.getServiceName()); + Cluster cluster = service.getCluster(e.getClusterId()); + if (cluster.isLbCluster()) { + Thread th = new Thread(new LBClusterMonitorAdder(cluster)); + th.start(); + } else { + Thread th = new Thread(new ClusterMonitorAdder(cluster)); + th.start(); + } + } finally { + TopologyManager.releaseReadLock(); + } + } + + }); + + processorChain.addEventListener(new ClusterRemovedEventListener() { + @Override + protected void onEvent(Event event) { + try { + ClusterRemovedEvent e = (ClusterRemovedEvent) event; + TopologyManager.acquireReadLock(); + String serviceName = e.getServiceName(); + String clusterId = e.getClusterId(); + + AbstractMonitor monitor; + + if(TopologyManager.getTopology().getService(serviceName).getCluster(clusterId).isLbCluster()){ + monitor = AutoscalerContext.getInstance().removeLbMonitor(clusterId); + + } else { + monitor = AutoscalerContext.getInstance().removeMonitor(clusterId); + } + + monitor.destroy(); + if(log.isDebugEnabled()) { + log.debug(String.format("Cluster monitor has been removed: [cluster] %s ", clusterId)); + } + } + finally { + TopologyManager.releaseReadLock(); + } + } + + }); + + processorChain.addEventListener(new MemberStartedEventListener() { + @Override + protected void onEvent(Event event) { + + } + + }); + + processorChain.addEventListener(new MemberTerminatedEventListener() { + @Override + protected void onEvent(Event event) { + + try { + TopologyManager.acquireReadLock(); + MemberTerminatedEvent e = (MemberTerminatedEvent) event; + String networkPartitionId = e.getNetworkPartitionId(); + String clusterId = e.getClusterId(); + AbstractMonitor monitor; + + if(AutoscalerContext.getInstance().moniterExist(clusterId)){ + + monitor = AutoscalerContext.getInstance().getMonitor(clusterId); + } else { + + //This is LB member + monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId); + } + + NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); + + networkPartitionContext.getPartitionCtxt(e.getPartitionId()) + .removeMemberStatsContext(e.getMemberId()); + networkPartitionContext.decreaseMemberCountInPartitionBy(e.getPartitionId(), 1); +// ClusterContext clusCtx = monitor.getClusterCtxt(); +// String networkPartitionId = monitor. +// if (networkPartitionId != null) { +// NetworkPartitionContext networkPartContext = monitor.getNetworkPartitionCtxt(networkPartitionId); +// networkPartContext.decrementCurrentMemberCount(1); +// } + + } finally { + TopologyManager.releaseReadLock(); + } + } + + }); + + processorChain.addEventListener(new MemberActivatedEventListener() { + @Override + protected void onEvent(Event event) { + + try { + TopologyManager.acquireReadLock(); + + MemberActivatedEvent e = (MemberActivatedEvent)event; + String memberId = e.getMemberId(); + String partitionId = e.getPartitionId(); + String networkPartitionId = e.getNetworkPartitionId(); + + String serviceName = e.getServiceName(); + PartitionContext partitionContext; + String clusterId = e.getClusterId(); + AbstractMonitor monitor; + + if(AutoscalerContext.getInstance().moniterExist(clusterId)) { + monitor = AutoscalerContext.getInstance().getMonitor(clusterId); + partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId); + } else { + monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId); + partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId); + } +// ClusterContext clusCtx = monitor.getClusterCtxt(); +// monitor.getNetworkPartitionCtxt(e.getId()).getPartitionCtxt(partitionId); +// .addMemberStatsContext(new MemberStatsContext(e.getMemberId())); + partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); +// PartitionContext partCtxt = monitor.getNetworkPartitionCtxt(e.getId()) +// .getPartitionCtxt(partitionId); + partitionContext.incrementCurrentMemberCount(1); + partitionContext.removePendingMember(memberId); + + } + finally{ + TopologyManager.releaseReadLock(); + } + } + }); + + processorChain.addEventListener(new ServiceRemovedEventListener() { + @Override + protected void onEvent(Event event) { +// try { +// TopologyManager.acquireReadLock(); +// +// // Remove all clusters of given service from context +// ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent)event; +// for(Service service : TopologyManager.getTopology().getServices()) { +// for(Cluster cluster : service.getClusters()) { +// removeMonitor(cluster.getHostName()); +// } +// } +// } +// finally { +// TopologyManager.releaseReadLock(); +// } + } + }); + return processorChain; + } + + private class LBClusterMonitorAdder implements Runnable { + private Cluster cluster; + + public LBClusterMonitorAdder(Cluster cluster) { + this.cluster = cluster; + } + + public void run() { + LbClusterMonitor monitor; + try { + monitor = AutoscalerUtil.getLBClusterMonitor(cluster); + + } catch (PolicyValidationException e) { + String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId(); + log.error(msg, e); + throw new RuntimeException(msg, e); + + } catch(PartitionValidationException e){ + String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId(); + log.error(msg, e); + throw new RuntimeException(msg, e); + } + + Thread th = new Thread(monitor); + th.start(); + AutoscalerContext.getInstance().addLbMonitor(monitor); + log.info(String.format("LB Cluster monitor has been added: [cluster] %s", + cluster.getClusterId())); +// if (log.isDebugEnabled()) { +// log.debug(String.format("Cluster monitor has been added: [cluster] %s", +// cluster.getClusterId())); +// } + } + } + + private class ClusterMonitorAdder implements Runnable { + private Cluster cluster; + + public ClusterMonitorAdder(Cluster cluster) { + this.cluster = cluster; + } + + public void run() { + ClusterMonitor monitor; + try { + monitor = AutoscalerUtil.getClusterMonitor(cluster); + + } catch (PolicyValidationException e) { + String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId(); + log.error(msg, e); + throw new RuntimeException(msg, e); + + } catch(PartitionValidationException e){ + String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId(); + log.error(msg, e); + throw new RuntimeException(msg, e); + } + + Thread th = new Thread(monitor); + th.start(); + AutoscalerContext.getInstance().addMonitor(monitor); + log.info(String.format("Cluster monitor has been added: [cluster] %s", + cluster.getClusterId())); +// if (log.isDebugEnabled()) { +// log.debug(String.format("Cluster monitor has been added: [cluster] %s", +// cluster.getClusterId())); +// } + } + } + +// private void addClusterToContext(Cluster cluster) { +// ClusterContext ctxt; +// try { +// ctxt = AutoscalerUtil.getClusterMonitor(cluster); +// } catch (PolicyValidationException e) { +// String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId(); +// log.error(msg, e); +// throw new RuntimeException(msg, e); +// } +// AutoscalerContext ruleCtxt = AutoscalerContext.getInstance(); +// ClusterMonitor monitor = +// new ClusterMonitor(cluster.getClusterId(), ctxt, +// ruleCtxt.getStatefulSession()); +// Thread th = new Thread(monitor); +// th.start(); +// AutoscalerContext.getInstance().addMonitor(monitor); +// if (log.isDebugEnabled()) { +// log.debug(String.format("Cluster monitor has been added: [cluster] %s", +// cluster.getClusterId())); +// } +// } + + private void removeMonitor(String clusterId) { + ClusterMonitor monitor = AutoscalerContext.getInstance().removeMonitor(clusterId); +// monitor.unsubscribe(); + monitor.destroy(); + if(log.isDebugEnabled()) { + log.debug(String.format("Cluster monitor has been removed: [cluster] %s ", clusterId)); + } + } + + private Cluster findCluster(String clusterId) { + if(clusterId == null) { + return null; + } + + Collection<Service> services = TopologyManager.getTopology().getServices(); + for (Service service : services) { + for (Cluster cluster : service.getClusters()) { + if (clusterId.equals(cluster.getClusterId())) { + return cluster; + } + } + } + return null; + } + + /** + * Terminate load balancer topology receiver thread. + */ + public void terminate() { + topologyReceiver.terminate(); + terminated = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/aa539dbd/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/AutoscalerTopologyReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/AutoscalerTopologyReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/AutoscalerTopologyReceiver.java deleted file mode 100644 index b216df1..0000000 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/AutoscalerTopologyReceiver.java +++ /dev/null @@ -1,401 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.autoscaler.topology; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.autoscaler.AutoscalerContext; -import org.apache.stratos.autoscaler.MemberStatsContext; -import org.apache.stratos.autoscaler.NetworkPartitionContext; -import org.apache.stratos.autoscaler.PartitionContext; -import org.apache.stratos.autoscaler.exception.PartitionValidationException; -import org.apache.stratos.autoscaler.exception.PolicyValidationException; -import org.apache.stratos.autoscaler.monitor.AbstractMonitor; -import org.apache.stratos.autoscaler.monitor.ClusterMonitor; -import org.apache.stratos.autoscaler.monitor.LbClusterMonitor; -import org.apache.stratos.autoscaler.util.AutoscalerUtil; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.Service; -import org.apache.stratos.messaging.event.Event; -import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent; -import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; -import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; -import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; -import org.apache.stratos.messaging.listener.topology.*; -import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain; -import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; -import org.apache.stratos.messaging.message.receiver.topology.TopologyReceiver; - -import java.util.Collection; - -/** - * Load balancer topology receiver. - */ -public class AutoscalerTopologyReceiver implements Runnable { - - private static final Log log = LogFactory.getLog(AutoscalerTopologyReceiver.class); - - private TopologyReceiver topologyReceiver; - private boolean terminated; - - public AutoscalerTopologyReceiver() { - this.topologyReceiver = new TopologyReceiver(createMessageDelegator()); - } - - @Override - public void run() { - //FIXME this activated before autoscaler deployer activated. - try { - Thread.sleep(15000); - } catch (InterruptedException ignore) { - } - Thread thread = new Thread(topologyReceiver); - thread.start(); - if(log.isInfoEnabled()) { - log.info("Autoscaler topology receiver thread started"); - } - - // Keep the thread live until terminated - while (!terminated); - if(log.isInfoEnabled()) { - log.info("Autoscaler topology receiver thread terminated"); - } - } - - private TopologyEventMessageDelegator createMessageDelegator() { - TopologyMessageProcessorChain processorChain = createEventProcessorChain(); - processorChain.addEventListener(new CompleteTopologyEventListener() { - @Override - protected void onEvent(Event event) { - - try { - TopologyManager.acquireReadLock(); - for(Service service : TopologyManager.getTopology().getServices()) { - for(Cluster cluster : service.getClusters()) { - Thread th; - if(cluster.isLbCluster()){ - th = new Thread(new LBClusterMonitorAdder(cluster)); - }else{ - th = new Thread(new ClusterMonitorAdder(cluster)); - } - - th.start(); - } - } - } - finally { - TopologyManager.releaseReadLock(); - } - } - - }); - return new TopologyEventMessageDelegator(processorChain); - } - - private TopologyMessageProcessorChain createEventProcessorChain() { - // Listen to topology events that affect clusters - TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain(); - processorChain.addEventListener(new ClusterCreatedEventListener() { - @Override - protected void onEvent(Event event) { - try { - ClusterCreatedEvent e = (ClusterCreatedEvent) event; - TopologyManager.acquireReadLock(); - Service service = TopologyManager.getTopology().getService(e.getServiceName()); - Cluster cluster = service.getCluster(e.getClusterId()); - if (cluster.isLbCluster()) { - Thread th = new Thread(new LBClusterMonitorAdder(cluster)); - th.start(); - } else { - Thread th = new Thread(new ClusterMonitorAdder(cluster)); - th.start(); - } - } finally { - TopologyManager.releaseReadLock(); - } - } - - }); - - processorChain.addEventListener(new ClusterRemovedEventListener() { - @Override - protected void onEvent(Event event) { - try { - ClusterRemovedEvent e = (ClusterRemovedEvent) event; - TopologyManager.acquireReadLock(); - String serviceName = e.getServiceName(); - String clusterId = e.getClusterId(); - - AbstractMonitor monitor; - - if(TopologyManager.getTopology().getService(serviceName).getCluster(clusterId).isLbCluster()){ - monitor = AutoscalerContext.getInstance().removeLbMonitor(clusterId); - - } else { - monitor = AutoscalerContext.getInstance().removeMonitor(clusterId); - } - - monitor.destroy(); - if(log.isDebugEnabled()) { - log.debug(String.format("Cluster monitor has been removed: [cluster] %s ", clusterId)); - } - } - finally { - TopologyManager.releaseReadLock(); - } - } - - }); - - processorChain.addEventListener(new MemberStartedEventListener() { - @Override - protected void onEvent(Event event) { - - } - - }); - - processorChain.addEventListener(new MemberTerminatedEventListener() { - @Override - protected void onEvent(Event event) { - - try { - TopologyManager.acquireReadLock(); - MemberTerminatedEvent e = (MemberTerminatedEvent) event; - String networkPartitionId = e.getNetworkPartitionId(); - String clusterId = e.getClusterId(); - AbstractMonitor monitor; - - if(AutoscalerContext.getInstance().moniterExist(clusterId)){ - - monitor = AutoscalerContext.getInstance().getMonitor(clusterId); - } else { - - //This is LB member - monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId); - } - - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - - networkPartitionContext.getPartitionCtxt(e.getPartitionId()) - .removeMemberStatsContext(e.getMemberId()); - networkPartitionContext.decreaseMemberCountInPartitionBy(e.getPartitionId(), 1); -// ClusterContext clusCtx = monitor.getClusterCtxt(); -// String networkPartitionId = monitor. -// if (networkPartitionId != null) { -// NetworkPartitionContext networkPartContext = monitor.getNetworkPartitionCtxt(networkPartitionId); -// networkPartContext.decrementCurrentMemberCount(1); -// } - - } finally { - TopologyManager.releaseReadLock(); - } - } - - }); - - processorChain.addEventListener(new MemberActivatedEventListener() { - @Override - protected void onEvent(Event event) { - - try { - TopologyManager.acquireReadLock(); - - MemberActivatedEvent e = (MemberActivatedEvent)event; - String memberId = e.getMemberId(); - String partitionId = e.getPartitionId(); - String networkPartitionId = e.getNetworkPartitionId(); - - String serviceName = e.getServiceName(); - PartitionContext partitionContext; - String clusterId = e.getClusterId(); - AbstractMonitor monitor; - - if(AutoscalerContext.getInstance().moniterExist(clusterId)) { - monitor = AutoscalerContext.getInstance().getMonitor(clusterId); - partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId); - } else { - monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId); - partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId); - } -// ClusterContext clusCtx = monitor.getClusterCtxt(); -// monitor.getNetworkPartitionCtxt(e.getId()).getPartitionCtxt(partitionId); -// .addMemberStatsContext(new MemberStatsContext(e.getMemberId())); - partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); -// PartitionContext partCtxt = monitor.getNetworkPartitionCtxt(e.getId()) -// .getPartitionCtxt(partitionId); - partitionContext.incrementCurrentMemberCount(1); - partitionContext.removePendingMember(memberId); - - } - finally{ - TopologyManager.releaseReadLock(); - } - } - }); - - processorChain.addEventListener(new ServiceRemovedEventListener() { - @Override - protected void onEvent(Event event) { -// try { -// TopologyManager.acquireReadLock(); -// -// // Remove all clusters of given service from context -// ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent)event; -// for(Service service : TopologyManager.getTopology().getServices()) { -// for(Cluster cluster : service.getClusters()) { -// removeMonitor(cluster.getHostName()); -// } -// } -// } -// finally { -// TopologyManager.releaseReadLock(); -// } - } - }); - return processorChain; - } - - private class LBClusterMonitorAdder implements Runnable { - private Cluster cluster; - - public LBClusterMonitorAdder(Cluster cluster) { - this.cluster = cluster; - } - - public void run() { - LbClusterMonitor monitor; - try { - monitor = AutoscalerUtil.getLBClusterMonitor(cluster); - - } catch (PolicyValidationException e) { - String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId(); - log.error(msg, e); - throw new RuntimeException(msg, e); - - } catch(PartitionValidationException e){ - String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId(); - log.error(msg, e); - throw new RuntimeException(msg, e); - } - - Thread th = new Thread(monitor); - th.start(); - AutoscalerContext.getInstance().addLbMonitor(monitor); - log.info(String.format("LB Cluster monitor has been added: [cluster] %s", - cluster.getClusterId())); -// if (log.isDebugEnabled()) { -// log.debug(String.format("Cluster monitor has been added: [cluster] %s", -// cluster.getClusterId())); -// } - } - } - - private class ClusterMonitorAdder implements Runnable { - private Cluster cluster; - - public ClusterMonitorAdder(Cluster cluster) { - this.cluster = cluster; - } - - public void run() { - ClusterMonitor monitor; - try { - monitor = AutoscalerUtil.getClusterMonitor(cluster); - - } catch (PolicyValidationException e) { - String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId(); - log.error(msg, e); - throw new RuntimeException(msg, e); - - } catch(PartitionValidationException e){ - String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId(); - log.error(msg, e); - throw new RuntimeException(msg, e); - } - - Thread th = new Thread(monitor); - th.start(); - AutoscalerContext.getInstance().addMonitor(monitor); - log.info(String.format("Cluster monitor has been added: [cluster] %s", - cluster.getClusterId())); -// if (log.isDebugEnabled()) { -// log.debug(String.format("Cluster monitor has been added: [cluster] %s", -// cluster.getClusterId())); -// } - } - } - -// private void addClusterToContext(Cluster cluster) { -// ClusterContext ctxt; -// try { -// ctxt = AutoscalerUtil.getClusterMonitor(cluster); -// } catch (PolicyValidationException e) { -// String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId(); -// log.error(msg, e); -// throw new RuntimeException(msg, e); -// } -// AutoscalerContext ruleCtxt = AutoscalerContext.getInstance(); -// ClusterMonitor monitor = -// new ClusterMonitor(cluster.getClusterId(), ctxt, -// ruleCtxt.getStatefulSession()); -// Thread th = new Thread(monitor); -// th.start(); -// AutoscalerContext.getInstance().addMonitor(monitor); -// if (log.isDebugEnabled()) { -// log.debug(String.format("Cluster monitor has been added: [cluster] %s", -// cluster.getClusterId())); -// } -// } - - private void removeMonitor(String clusterId) { - ClusterMonitor monitor = AutoscalerContext.getInstance().removeMonitor(clusterId); -// monitor.unsubscribe(); - monitor.destroy(); - if(log.isDebugEnabled()) { - log.debug(String.format("Cluster monitor has been removed: [cluster] %s ", clusterId)); - } - } - - private Cluster findCluster(String clusterId) { - if(clusterId == null) { - return null; - } - - Collection<Service> services = TopologyManager.getTopology().getServices(); - for (Service service : services) { - for (Cluster cluster : service.getClusters()) { - if (clusterId.equals(cluster.getClusterId())) { - return cluster; - } - } - } - return null; - } - - /** - * Terminate load balancer topology receiver thread. - */ - public void terminate() { - topologyReceiver.terminate(); - terminated = true; - } -}
