Repository: stratos Updated Branches: refs/heads/docker-grouping-merge e39ba22d9 -> ed5feced6
http://git-wip-us.apache.org/repos/asf/stratos/blob/ed5feced/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java deleted file mode 100644 index a5c6577..0000000 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java +++ /dev/null @@ -1,500 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.autoscaler.message.receiver.health; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.autoscaler.AutoscalerContext; -import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.Member; -import org.apache.stratos.messaging.domain.topology.Service; -import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent; -import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent; -import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent; -import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent; -import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent; -import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent; -import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent; -import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent; -import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent; -import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent; -import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent; -import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent; -import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent; -import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent; -import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent; -import org.apache.stratos.messaging.listener.health.stat.AverageLoadAverageEventListener; -import org.apache.stratos.messaging.listener.health.stat.AverageMemoryConsumptionEventListener; -import org.apache.stratos.messaging.listener.health.stat.AverageRequestsInFlightEventListener; -import org.apache.stratos.messaging.listener.health.stat.GradientOfLoadAverageEventListener; -import org.apache.stratos.messaging.listener.health.stat.GradientOfMemoryConsumptionEventListener; -import org.apache.stratos.messaging.listener.health.stat.GradientOfRequestsInFlightEventListener; -import org.apache.stratos.messaging.listener.health.stat.MemberAverageLoadAverageEventListener; -import org.apache.stratos.messaging.listener.health.stat.MemberAverageMemoryConsumptionEventListener; -import org.apache.stratos.messaging.listener.health.stat.MemberFaultEventListener; -import org.apache.stratos.messaging.listener.health.stat.MemberGradientOfLoadAverageEventListener; -import org.apache.stratos.messaging.listener.health.stat.MemberGradientOfMemoryConsumptionEventListener; -import org.apache.stratos.messaging.listener.health.stat.MemberSecondDerivativeOfLoadAverageEventListener; -import org.apache.stratos.messaging.listener.health.stat.MemberSecondDerivativeOfMemoryConsumptionEventListener; -import org.apache.stratos.messaging.listener.health.stat.SecondDerivativeOfLoadAverageEventListener; -import org.apache.stratos.messaging.listener.health.stat.SecondDerivativeOfMemoryConsumptionEventListener; -import org.apache.stratos.messaging.listener.health.stat.SecondDerivativeOfRequestsInFlightEventListener; -import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventReceiver; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; - - -/** - * A thread for processing topology messages and updating the topology data structure. - */ -public class AutoscalerHealthStatEventReceiver implements Runnable { - - private static final Log log = LogFactory.getLog(AutoscalerHealthStatEventReceiver.class); - private boolean terminated = false; - - private HealthStatEventReceiver healthStatEventReceiver; - - public AutoscalerHealthStatEventReceiver() { - this.healthStatEventReceiver = new HealthStatEventReceiver(); - addEventListeners(); - } - - @Override - public void run() { - //FIXME this activated before autoscaler deployer activated. - try { - Thread.sleep(15000); - } catch (InterruptedException ignore) { - } - Thread thread = new Thread(healthStatEventReceiver); - thread.start(); - if(log.isInfoEnabled()) { - log.info("Autoscaler health stat event receiver thread started"); - } - - // Keep the thread live until terminated - while (!terminated){ - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) { - } - } - if(log.isInfoEnabled()) { - log.info("Autoscaler health stat event receiver thread terminated"); - } - } - - private void addEventListeners() { - // Listen to health stat events that affect clusters - healthStatEventReceiver.addEventListener(new AverageLoadAverageEventListener() { - @Override - protected void onEvent(org.apache.stratos.messaging.event.Event event) { - AverageLoadAverageEvent averageLoadAverageEvent = (AverageLoadAverageEvent) event; - String clusterId = averageLoadAverageEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleAverageLoadAverageEvent(averageLoadAverageEvent); - } - - }); - healthStatEventReceiver.addEventListener(new AverageMemoryConsumptionEventListener() { - @Override - protected void onEvent(org.apache.stratos.messaging.event.Event event) { - AverageMemoryConsumptionEvent averageMemoryConsumptionEvent = (AverageMemoryConsumptionEvent) event; - String clusterId = averageMemoryConsumptionEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleAverageMemoryConsumptionEvent(averageMemoryConsumptionEvent); - } - }); - - healthStatEventReceiver.addEventListener(new AverageRequestsInFlightEventListener() { - @Override - protected void onEvent(org.apache.stratos.messaging.event.Event event) { - AverageRequestsInFlightEvent averageRequestsInFlightEvent = (AverageRequestsInFlightEvent) event; - String clusterId = averageRequestsInFlightEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleAverageRequestsInFlightEvent(averageRequestsInFlightEvent); - } - }); - - healthStatEventReceiver.addEventListener(new GradientOfLoadAverageEventListener() { - @Override - protected void onEvent(org.apache.stratos.messaging.event.Event event) { - GradientOfLoadAverageEvent gradientOfLoadAverageEvent = (GradientOfLoadAverageEvent) event; - String clusterId = gradientOfLoadAverageEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleGradientOfLoadAverageEvent(gradientOfLoadAverageEvent); - } - }); - - healthStatEventReceiver.addEventListener(new GradientOfMemoryConsumptionEventListener() { - @Override - protected void onEvent(org.apache.stratos.messaging.event.Event event) { - GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent = (GradientOfMemoryConsumptionEvent) event; - String clusterId = gradientOfMemoryConsumptionEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleGradientOfMemoryConsumptionEvent(gradientOfMemoryConsumptionEvent); - } - }); - - healthStatEventReceiver.addEventListener(new GradientOfRequestsInFlightEventListener() { - @Override - protected void onEvent(org.apache.stratos.messaging.event.Event event) { - GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent = (GradientOfRequestsInFlightEvent) event; - String clusterId = gradientOfRequestsInFlightEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleGradientOfRequestsInFlightEvent(gradientOfRequestsInFlightEvent); - } - }); - - healthStatEventReceiver.addEventListener(new MemberAverageLoadAverageEventListener() { - @Override - protected void onEvent(org.apache.stratos.messaging.event.Event event) { - MemberAverageLoadAverageEvent memberAverageLoadAverageEvent = (MemberAverageLoadAverageEvent) event; - String memberId = memberAverageLoadAverageEvent.getMemberId(); - Member member = getMemberByMemberId(memberId); - if (null == member) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member not found in the Topology: [member] %s", memberId)); - } - return; - } - if (!member.isActive()) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member activated event has not received for the member %s. " - + "Therefore ignoring" + " the health stat", memberId)); - } - return; - } - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - String clusterId = member.getClusterId(); - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberAverageLoadAverageEvent(memberAverageLoadAverageEvent); - } - }); - - healthStatEventReceiver.addEventListener(new MemberAverageMemoryConsumptionEventListener() { - @Override - protected void onEvent(org.apache.stratos.messaging.event.Event event) { - MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent = (MemberAverageMemoryConsumptionEvent) event; - String memberId = memberAverageMemoryConsumptionEvent.getMemberId(); - Member member = getMemberByMemberId(memberId); - if (null == member) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member not found in the Topology: [member] %s", memberId)); - } - return; - } - if (!member.isActive()) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member activated event has not received for the member %s. " - + "Therefore ignoring" + " the health stat", memberId)); - } - return; - } - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - String clusterId = member.getClusterId(); - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberAverageMemoryConsumptionEvent(memberAverageMemoryConsumptionEvent); - } - }); - - healthStatEventReceiver.addEventListener(new MemberFaultEventListener() { - @Override - protected void onEvent(org.apache.stratos.messaging.event.Event event) { - MemberFaultEvent memberFaultEvent = (MemberFaultEvent) event; - String clusterId = memberFaultEvent.getClusterId(); - String memberId = memberFaultEvent.getMemberId(); - if (log.isDebugEnabled()) { - log.debug(String.format("Member fault event: [member] %s ", memberId)); - } - if (memberId == null || memberId.isEmpty()) { - log.error("Member id not found in received message"); - return; - } - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberFaultEvent(memberFaultEvent); - } - }); - - healthStatEventReceiver.addEventListener(new MemberGradientOfLoadAverageEventListener() { - @Override - protected void onEvent(org.apache.stratos.messaging.event.Event event) { - MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent = (MemberGradientOfLoadAverageEvent) event; - String memberId = memberGradientOfLoadAverageEvent.getMemberId(); - Member member = getMemberByMemberId(memberId); - if (null == member) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member not found in the Topology: [member] %s", memberId)); - } - return; - } - if (!member.isActive()) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member activated event has not received for the member %s. " - + "Therefore ignoring" + " the health stat", memberId)); - } - return; - } - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - String clusterId = member.getClusterId(); - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberGradientOfLoadAverageEvent(memberGradientOfLoadAverageEvent); - } - }); - - healthStatEventReceiver.addEventListener(new MemberGradientOfMemoryConsumptionEventListener() { - @Override - protected void onEvent(org.apache.stratos.messaging.event.Event event) { - MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent = (MemberGradientOfMemoryConsumptionEvent) event; - String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId(); - Member member = getMemberByMemberId(memberId); - if (null == member) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member not found in the Topology: [member] %s", memberId)); - } - return; - } - if (!member.isActive()) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member activated event has not received for the member %s. " - + "Therefore ignoring" + " the health stat", memberId)); - } - return; - } - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - String clusterId = member.getClusterId(); - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberGradientOfMemoryConsumptionEvent(memberGradientOfMemoryConsumptionEvent); - } - }); - - healthStatEventReceiver.addEventListener(new MemberSecondDerivativeOfLoadAverageEventListener() { - @Override - protected void onEvent(org.apache.stratos.messaging.event.Event event) { - MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent = (MemberSecondDerivativeOfLoadAverageEvent) event; - String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId(); - Member member = getMemberByMemberId(memberId); - if (null == member) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member not found in the Topology: [member] %s", memberId)); - } - return; - } - if (!member.isActive()) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member activated event has not received for the member %s. " - + "Therefore ignoring" + " the health stat", memberId)); - } - return; - } - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - String clusterId = member.getClusterId(); - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberSecondDerivativeOfLoadAverageEvent(memberSecondDerivativeOfLoadAverageEvent); - } - }); - - healthStatEventReceiver.addEventListener(new MemberSecondDerivativeOfMemoryConsumptionEventListener() { - @Override - protected void onEvent(org.apache.stratos.messaging.event.Event event) { - - } - }); - - healthStatEventReceiver.addEventListener(new SecondDerivativeOfLoadAverageEventListener() { - @Override - protected void onEvent(org.apache.stratos.messaging.event.Event event) { - SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent = (SecondDerivativeOfLoadAverageEvent) event; - String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleSecondDerivativeOfLoadAverageEvent(secondDerivativeOfLoadAverageEvent); - } - }); - - healthStatEventReceiver.addEventListener(new SecondDerivativeOfMemoryConsumptionEventListener() { - @Override - protected void onEvent(org.apache.stratos.messaging.event.Event event) { - SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent = (SecondDerivativeOfMemoryConsumptionEvent) event; - String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleSecondDerivativeOfMemoryConsumptionEvent(secondDerivativeOfMemoryConsumptionEvent); - } - }); - - healthStatEventReceiver.addEventListener(new SecondDerivativeOfRequestsInFlightEventListener() { - @Override - protected void onEvent(org.apache.stratos.messaging.event.Event event) { - SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent = (SecondDerivativeOfRequestsInFlightEvent) event; - String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleSecondDerivativeOfRequestsInFlightEvent(secondDerivativeOfRequestsInFlightEvent); - } - }); - } - - private Member getMemberByMemberId(String memberId) { - try { - TopologyManager.acquireReadLock(); - for (Service service : TopologyManager.getTopology().getServices()) { - for (Cluster cluster : service.getClusters()) { - if (cluster.memberExists(memberId)) { - return cluster.getMember(memberId); - } - } - } - return null; - } finally { - TopologyManager.releaseReadLock(); - } - } - - public void terminate() { - this.terminated = true; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/ed5feced/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java deleted file mode 100644 index de07b17..0000000 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java +++ /dev/null @@ -1,535 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.autoscaler.message.receiver.topology; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.autoscaler.AutoscalerContext; -import org.apache.stratos.autoscaler.NetworkPartitionContext; -import org.apache.stratos.autoscaler.applications.ApplicationHolder; -import org.apache.stratos.autoscaler.exception.DependencyBuilderException; -import org.apache.stratos.autoscaler.exception.TopologyInConsistentException; -import org.apache.stratos.autoscaler.grouping.topic.InstanceNotificationPublisher; -import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor; -import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitorFactory; -import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor; -import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor; -import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; -import org.apache.stratos.messaging.domain.applications.Application; -import org.apache.stratos.messaging.domain.applications.Applications; -import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; -import org.apache.stratos.messaging.domain.topology.ClusterStatus; -import org.apache.stratos.messaging.domain.topology.Service; -import org.apache.stratos.messaging.domain.topology.Topology; -import org.apache.stratos.messaging.event.Event; -import org.apache.stratos.messaging.event.topology.*; -import org.apache.stratos.messaging.listener.topology.*; -import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; -import org.drools.runtime.StatefulKnowledgeSession; -import org.drools.runtime.rule.FactHandle; - -/** - * Autoscaler topology receiver. - */ -public class AutoscalerTopologyEventReceiver implements Runnable { - - private static final Log log = LogFactory.getLog(AutoscalerTopologyEventReceiver.class); - - private TopologyEventReceiver topologyEventReceiver; - private boolean terminated; - private boolean topologyInitialized; - - public AutoscalerTopologyEventReceiver() { - this.topologyEventReceiver = new TopologyEventReceiver(); - addEventListeners(); - } - - @Override - public void run() { - //FIXME this activated before autoscaler deployer activated. - try { - Thread.sleep(15000); - } catch (InterruptedException ignore) { - } - Thread thread = new Thread(topologyEventReceiver); - thread.start(); - if (log.isInfoEnabled()) { - log.info("Autoscaler topology receiver thread started"); - } - - // Keep the thread live until terminated - while (!terminated) { - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) { - } - } - if (log.isInfoEnabled()) { - log.info("Autoscaler topology receiver thread terminated"); - } - } - - private boolean allClustersInitialized(Application application) { - boolean allClustersInitialized = false; - for (ClusterDataHolder holder : application.getClusterDataMap().values()) { - TopologyManager.acquireReadLockForCluster(holder.getServiceType(), - holder.getClusterId()); - - try { - Topology topology = TopologyManager.getTopology(); - if (topology != null) { - Service service = topology.getService(holder.getServiceType()); - if (service != null) { - if (service.clusterExists(holder.getClusterId())) { - allClustersInitialized = true; - } else { - if (log.isDebugEnabled()) { - log.debug("[Cluster] " + holder.getClusterId() + " is not found in " + - "the Topology"); - } - allClustersInitialized = false; - return allClustersInitialized; - } - } else { - if (log.isDebugEnabled()) { - log.debug("Service is null in the CompleteTopologyEvent"); - } - } - } else { - if (log.isDebugEnabled()) { - log.debug("Topology is null in the CompleteTopologyEvent"); - } - } - } finally { - TopologyManager.releaseReadLockForCluster(holder.getServiceType(), - holder.getClusterId()); - } - } - return allClustersInitialized; - } - - - private void addEventListeners() { - // Listen to topology events that affect clusters - topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { - @Override - protected void onEvent(Event event) { - if (!topologyInitialized) { - log.info("[CompleteTopologyEvent] Received: " + event.getClass()); - ApplicationHolder.acquireReadLock(); - try { - Applications applications = ApplicationHolder.getApplications(); - if (applications != null) { - for (Application application : applications.getApplications().values()) { - if (allClustersInitialized(application)) { - startApplicationMonitor(application.getUniqueIdentifier()); - } else { - log.error("Complete Topology is not consistent with the applications " + - "which got persisted"); - } - } - topologyInitialized = true; - } else { - log.info("No applications found in the complete topology"); - } - } catch (Exception e) { - log.error("Error processing event", e); - } finally { - ApplicationHolder.releaseReadLock(); - } - } - } - }); - - - topologyEventReceiver.addEventListener(new ApplicationClustersCreatedEventListener() { - @Override - protected void onEvent(Event event) { - try { - log.info("[ApplicationClustersCreatedEvent] Received: " + event.getClass()); - ApplicationClustersCreatedEvent applicationClustersCreatedEvent = - (ApplicationClustersCreatedEvent) event; - String appId = applicationClustersCreatedEvent.getAppId(); - try { - //acquire read lock - ApplicationHolder.acquireReadLock(); - //start the application monitor - startApplicationMonitor(appId); - } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); - log.error(msg, e); - } finally { - //release read lock - ApplicationHolder.releaseReadLock(); - - } - } catch (ClassCastException e) { - String msg = "Error while casting the event " + e.getLocalizedMessage(); - log.error(msg, e); - } - - } - }); - - topologyEventReceiver.addEventListener(new ClusterActivatedEventListener() { - @Override - protected void onEvent(Event event) { - log.info("[ClusterActivatedEvent] Received: " + event.getClass()); - - ClusterActivatedEvent clusterActivatedEvent = (ClusterActivatedEvent) event; - String clusterId = clusterActivatedEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - //changing the status in the monitor, will notify its parent monitor - monitor.setStatus(ClusterStatus.Active); - } - }); - - topologyEventReceiver.addEventListener(new ClusterResetEventListener() { - @Override - protected void onEvent(Event event) { - - log.info("[ClusterCreatedEvent] Received: " + event.getClass()); - - ClusterCreatedEvent clusterCreatedEvent = (ClusterCreatedEvent) event; - String clusterId = clusterCreatedEvent.getCluster().getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - - //changing the status in the monitor, will notify its parent monitor - monitor.setStop(true); - monitor.setStatus(ClusterStatus.Created); - - } - }); - - topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() { - @Override - protected void onEvent(Event event) { - log.info("[ClusterCreatedEvent] Received: " + event.getClass()); - } - }); - - topologyEventReceiver.addEventListener(new ClusterInActivateEventListener() { - @Override - protected void onEvent(Event event) { - log.info("[ClusterInActivateEvent] Received: " + event.getClass()); - - ClusterInactivateEvent clusterInactivateEvent = (ClusterInactivateEvent) event; - String clusterId = clusterInactivateEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - //changing the status in the monitor, will notify its parent monitor - monitor.setStatus(ClusterStatus.Inactive); - } - }); - - topologyEventReceiver.addEventListener(new ClusterTerminatingEventListener() { - @Override - protected void onEvent(Event event) { - - log.info("[ClusterTerminatingEvent] Received: " + event.getClass()); - - ClusterTerminatingEvent clusterTerminatingEvent = (ClusterTerminatingEvent) event; - String clusterId = clusterTerminatingEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - //changing the status in the monitor, will notify its parent monitor - if (monitor.getStatus() == ClusterStatus.Active) { - // terminated gracefully - monitor.setStatus(ClusterStatus.Terminating); - InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId); - } else { - monitor.setStatus(ClusterStatus.Terminating); - monitor.terminateAllMembers(); - } - } - }); - - topologyEventReceiver.addEventListener(new ClusterTerminatedEventListener() { - @Override - protected void onEvent(Event event) { - log.info("[ClusterTerminatedEvent] Received: " + event.getClass()); - - ClusterTerminatedEvent clusterTerminatedEvent = (ClusterTerminatedEvent) event; - String clusterId = clusterTerminatedEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - - //changing the status in the monitor, will notify its parent monitor - monitor.setStatus(ClusterStatus.Terminated); - //Destroying and Removing the Cluster monitor - monitor.destroy(); - AutoscalerContext.getInstance().removeClusterMonitor(clusterId); - } - }); - - topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() { - @Override - protected void onEvent(Event event) { - try { - MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event; - String clusterId = memberReadyToShutdownEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent); - } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); - log.error(msg, e); - } - } - }); - - - topologyEventReceiver.addEventListener(new MemberStartedEventListener() { - @Override - protected void onEvent(Event event) { - - } - - }); - - topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() { - @Override - protected void onEvent(Event event) { - try { - MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; - String clusterId = memberTerminatedEvent.getClusterId(); - AbstractClusterMonitor monitor; - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberTerminatedEvent(memberTerminatedEvent); - } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); - log.error(msg, e); - } - } - }); - - topologyEventReceiver.addEventListener(new MemberActivatedEventListener() { - @Override - protected void onEvent(Event event) { - try { - MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; - String clusterId = memberActivatedEvent.getClusterId(); - AbstractClusterMonitor monitor; - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberActivatedEvent(memberActivatedEvent); - } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); - log.error(msg, e); - } - } - }); - - topologyEventReceiver.addEventListener(new MemberMaintenanceListener() { - @Override - protected void onEvent(Event event) { - try { - MemberMaintenanceModeEvent maintenanceModeEvent = (MemberMaintenanceModeEvent) event; - String clusterId = maintenanceModeEvent.getClusterId(); - AbstractClusterMonitor monitor; - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent); - } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); - log.error(msg, e); - } - } - }); - } - - @SuppressWarnings("unused") - private void runTerminateAllRule(VMClusterMonitor monitor) { - - FactHandle terminateAllFactHandle = null; - - StatefulKnowledgeSession terminateAllKnowledgeSession = null; - - for (NetworkPartitionContext networkPartitionContext : monitor.getNetworkPartitionCtxts().values()) { - terminateAllFactHandle = AutoscalerRuleEvaluator.evaluateTerminateAll(terminateAllKnowledgeSession - , terminateAllFactHandle, networkPartitionContext); - } - - } - - /** - * Terminate load balancer topology receiver thread. - */ - public void terminate() { - topologyEventReceiver.terminate(); - terminated = true; - } - - protected synchronized void startApplicationMonitor(String applicationId) { - Thread th = null; - if (!AutoscalerContext.getInstance().appMonitorExist(applicationId)) { - th = new Thread( - new ApplicationMonitorAdder(applicationId)); - } - - if (th != null) { - th.start(); - // try { - // th.join(); - // } catch (InterruptedException ignore) { - - if (log.isDebugEnabled()) { - log.debug(String - .format("Application monitor thread has been started successfully: " + - "[application] %s ", applicationId)); - } - } else { - if (log.isDebugEnabled()) { - log.debug(String - .format("Application monitor thread already exists: " + - "[application] %s ", applicationId)); - } - } - } - - private class ApplicationMonitorAdder implements Runnable { - private String appId; - - public ApplicationMonitorAdder(String appId) { - this.appId = appId; - } - - public void run() { - ApplicationMonitor applicationMonitor = null; - int retries = 5; - boolean success = false; - do { - try { - Thread.sleep(5000); - } catch (InterruptedException e1) { - } - try { - long start = System.currentTimeMillis(); - if (log.isDebugEnabled()) { - log.debug("application monitor is going to be started for [application] " + - appId); - } - applicationMonitor = ApplicationMonitorFactory.getApplicationMonitor(appId); - - long end = System.currentTimeMillis(); - log.info("Time taken to start app monitor: " + (end - start) / 1000); - success = true; - } catch (DependencyBuilderException e) { - String msg = "Application monitor creation failed for Application: "; - log.warn(msg, e); - retries--; - } catch (TopologyInConsistentException e) { - String msg = "Application monitor creation failed for Application: "; - log.warn(msg, e); - retries--; - } - } while (!success && retries != 0); - - if (applicationMonitor == null) { - String msg = "Application monitor creation failed, even after retrying for 5 times, " - + "for Application: " + appId; - log.error(msg); - throw new RuntimeException(msg); - } - - AutoscalerContext.getInstance().addAppMonitor(applicationMonitor); - - if (log.isInfoEnabled()) { - log.info(String.format("Application monitor has been added successfully: " + - "[application] %s", applicationMonitor.getId())); - } - } - } - - -} http://git-wip-us.apache.org/repos/asf/stratos/blob/ed5feced/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java index bd36091..e7cfdb2 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java @@ -28,7 +28,7 @@ import org.apache.stratos.autoscaler.applications.dependency.DependencyBuilder; import org.apache.stratos.autoscaler.applications.dependency.DependencyTree; import org.apache.stratos.autoscaler.applications.dependency.context.ApplicationContext; import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher; -import org.apache.stratos.autoscaler.grouping.topic.ClusterStatusEventPublisher; +import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher; import org.apache.stratos.autoscaler.status.checker.StatusChecker; import org.apache.stratos.messaging.domain.applications.ParentComponent; import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitorFactory; http://git-wip-us.apache.org/repos/asf/stratos/blob/ed5feced/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java index cc351de..a6ed9aa 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java @@ -28,7 +28,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.NetworkPartitionContext; import org.apache.stratos.autoscaler.PartitionContext; import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; -import org.apache.stratos.autoscaler.grouping.topic.ClusterStatusEventPublisher; +import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher; import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent; import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; http://git-wip-us.apache.org/repos/asf/stratos/blob/ed5feced/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java index 250cab3..195f53d 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java @@ -25,7 +25,7 @@ import org.apache.stratos.autoscaler.NetworkPartitionContext; import org.apache.stratos.autoscaler.PartitionContext; import org.apache.stratos.autoscaler.applications.ApplicationHolder; import org.apache.stratos.autoscaler.applications.topic.ApplicationBuilder; -import org.apache.stratos.autoscaler.grouping.topic.ClusterStatusEventPublisher; +import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher; import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor; import org.apache.stratos.messaging.domain.applications.*; import org.apache.stratos.messaging.domain.topology.Cluster;
