restructure event related classes
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/ed5feced Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/ed5feced Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/ed5feced Branch: refs/heads/docker-grouping-merge Commit: ed5feced6a6d8806cec8506fa1fdb424761221e9 Parents: e39ba22 Author: Lahiru Sandaruwan <[email protected]> Authored: Tue Nov 4 18:01:51 2014 +0530 Committer: Lahiru Sandaruwan <[email protected]> Committed: Tue Nov 4 18:01:51 2014 +0530 ---------------------------------------------------------------------- .../publisher/ClusterStatusEventPublisher.java | 195 +++++++ .../InstanceNotificationPublisher.java | 53 ++ .../AutoscalerHealthStatEventReceiver.java | 501 +++++++++++++++++ .../AutoscalerTopologyEventReceiver.java | 535 +++++++++++++++++++ .../topic/ClusterStatusEventPublisher.java | 195 ------- .../topic/InstanceNotificationPublisher.java | 53 -- .../internal/AutoscalerServerComponent.java | 4 +- .../AutoscalerHealthStatEventReceiver.java | 500 ----------------- .../AutoscalerTopologyEventReceiver.java | 535 ------------------- .../monitor/ParentComponentMonitor.java | 2 +- .../cluster/VMServiceClusterMonitor.java | 2 +- .../status/checker/StatusChecker.java | 2 +- 12 files changed, 1289 insertions(+), 1288 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/ed5feced/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/publisher/ClusterStatusEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/publisher/ClusterStatusEventPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/publisher/ClusterStatusEventPublisher.java new file mode 100644 index 0000000..22d6eb9 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/publisher/ClusterStatusEventPublisher.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.autoscaler.event.publisher; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.cluster.status.*; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import org.apache.stratos.messaging.util.Constants; + +/** + * This will publish cluster status events to cluster-status topic + */ +public class ClusterStatusEventPublisher { + private static final Log log = LogFactory.getLog(ClusterStatusEventPublisher.class); + + + public static void sendClusterCreatedEvent(String appId, String serviceName, String clusterId) { + try { + TopologyManager.acquireReadLockForCluster(serviceName, clusterId); + Service service = TopologyManager.getTopology().getService(serviceName); + if (service != null) { + Cluster cluster = service.getCluster(clusterId); + if (cluster.isStateTransitionValid(ClusterStatus.Created)) { + if (log.isInfoEnabled()) { + log.info("Publishing Cluster created event for [application]: " + appId + + " [cluster]: " + clusterId); + } + /*ClusterStatusClusterCreatedEvent clusterCreatedEvent = + new ClusterStatusClusterCreatedEvent(appId, serviceName, clusterId); + + publishEvent(clusterCreatedEvent);*/ + } else { + log.warn("Created is not in the possible state list of [cluster] " + clusterId); + } + } + } finally { + TopologyManager.releaseReadLockForCluster(serviceName, clusterId); + } + } + + public static void sendClusterResetEvent(String appId, String serviceName, String clusterId) { + try { + TopologyManager.acquireReadLockForCluster(serviceName, clusterId); + Service service = TopologyManager.getTopology().getService(serviceName); + if (service != null) { + Cluster cluster = service.getCluster(clusterId); + if (cluster.isStateTransitionValid(ClusterStatus.Created)) { + if (log.isInfoEnabled()) { + log.info("Publishing Cluster created event for [application]: " + appId + + " [cluster]: " + clusterId); + } + ClusterStatusClusterResetEvent clusterCreatedEvent = + new ClusterStatusClusterResetEvent(appId, serviceName, clusterId); + + publishEvent(clusterCreatedEvent); + } else { + log.warn("Created is not in the possible state list of [cluster] " + clusterId); + } + } + } finally { + TopologyManager.releaseReadLockForCluster(serviceName, clusterId); + } + } + + public static void sendClusterActivatedEvent(String appId, String serviceName, String clusterId) { + try { + TopologyManager.acquireReadLockForCluster(serviceName, clusterId); + Service service = TopologyManager.getTopology().getService(serviceName); + if (service != null) { + Cluster cluster = service.getCluster(clusterId); + if (cluster.isStateTransitionValid(ClusterStatus.Active)) { + if (log.isInfoEnabled()) { + log.info("Publishing Cluster activated event for [application]: " + appId + + " [cluster]: " + clusterId); + } + ClusterStatusClusterActivatedEvent clusterActivatedEvent = + new ClusterStatusClusterActivatedEvent(appId, serviceName, clusterId); + + publishEvent(clusterActivatedEvent); + } else { + log.warn("Active is not in the possible state list of [cluster] " + clusterId); + } + } + } finally { + TopologyManager.releaseReadLockForCluster(serviceName, clusterId); + } + } + + public static void sendClusterInActivateEvent(String appId, String serviceName, String clusterId) { + try { + TopologyManager.acquireReadLockForCluster(serviceName, clusterId); + Service service = TopologyManager.getTopology().getService(serviceName); + if (service != null) { + Cluster cluster = service.getCluster(clusterId); + if (cluster.isStateTransitionValid(ClusterStatus.Inactive)) { + if (log.isInfoEnabled()) { + log.info("Publishing Cluster in-activate event for [application]: " + appId + + " [cluster]: " + clusterId); + } + ClusterStatusClusterInactivateEvent clusterInActivateEvent = + new ClusterStatusClusterInactivateEvent(appId, serviceName, clusterId); + + publishEvent(clusterInActivateEvent); + } else { + log.warn("In-active is not in the possible state list of [cluster] " + clusterId); + } + } + } finally { + TopologyManager.releaseReadLockForCluster(serviceName, clusterId); + + } + } + + public static void sendClusterTerminatingEvent(String appId, String serviceName, String clusterId) { + + try { + TopologyManager.acquireReadLockForCluster(serviceName, clusterId); + Service service = TopologyManager.getTopology().getService(serviceName); + if (service != null) { + Cluster cluster = service.getCluster(clusterId); + if (cluster.isStateTransitionValid(ClusterStatus.Terminating)) { + if (log.isInfoEnabled()) { + log.info("Publishing Cluster Terminating event for [application]: " + appId + + " [cluster]: " + clusterId); + } + ClusterStatusClusterTerminatingEvent appStatusClusterTerminatingEvent = + new ClusterStatusClusterTerminatingEvent(appId, serviceName, clusterId); + + publishEvent(appStatusClusterTerminatingEvent); + } else { + log.warn("Terminating is not in the possible state list of [cluster] " + clusterId); + } + } + } finally { + TopologyManager.releaseReadLockForCluster(serviceName, clusterId); + + } + + } + + public static void sendClusterTerminatedEvent(String appId, String serviceName, String clusterId) { + try { + TopologyManager.acquireReadLockForCluster(serviceName, clusterId); + Service service = TopologyManager.getTopology().getService(serviceName); + if (service != null) { + Cluster cluster = service.getCluster(clusterId); + if (cluster.isStateTransitionValid(ClusterStatus.Terminated)) { + if (log.isInfoEnabled()) { + log.info("Publishing Cluster terminated event for [application]: " + appId + + " [cluster]: " + clusterId); + } + ClusterStatusClusterTerminatedEvent appStatusClusterTerminatedEvent = + new ClusterStatusClusterTerminatedEvent(appId, serviceName, clusterId); + + publishEvent(appStatusClusterTerminatedEvent); + } else { + log.warn("Terminated is not in the possible state list of [cluster] " + clusterId); + } + } + } finally { + TopologyManager.releaseReadLockForCluster(serviceName, clusterId); + + } + } + + + public static void publishEvent(Event event) { + //publishing events to application status topic + EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.CLUSTER_STATUS_TOPIC); + eventPublisher.publish(event); + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/ed5feced/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/publisher/InstanceNotificationPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/publisher/InstanceNotificationPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/publisher/InstanceNotificationPublisher.java new file mode 100644 index 0000000..19c5e17 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/publisher/InstanceNotificationPublisher.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.autoscaler.event.publisher; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupClusterEvent; +import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent; +import org.apache.stratos.messaging.util.Constants; + +public class InstanceNotificationPublisher { + private static final Log log = LogFactory.getLog(InstanceNotificationPublisher.class); + + private static void publish(Event event) { + EventPublisher instanceNotifyingEvent = EventPublisherPool.getPublisher(Constants.INSTANCE_NOTIFIER_TOPIC); + instanceNotifyingEvent.publish(event); + } + + public static void sendInstanceCleanupEventForCluster(String clusterId) { + log.info(String.format("Publishing Instance Cleanup Event: [cluster] %s", clusterId)); + publish(new InstanceCleanupClusterEvent(clusterId)); + } + + /** + * Publishing the instance termination notification to the instances + * + * @param memberId + */ + public void sendInstanceCleanupEventForMember(String memberId) { + log.info(String.format("Publishing Instance Cleanup Event: [member] %s", memberId)); + publish(new InstanceCleanupMemberEvent(memberId)); + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/ed5feced/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java new file mode 100644 index 0000000..718cc16 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.autoscaler.event.receiver.health; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.AutoscalerContext; +import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Member; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent; +import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent; +import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent; +import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent; +import org.apache.stratos.messaging.listener.health.stat.AverageLoadAverageEventListener; +import org.apache.stratos.messaging.listener.health.stat.AverageMemoryConsumptionEventListener; +import org.apache.stratos.messaging.listener.health.stat.AverageRequestsInFlightEventListener; +import org.apache.stratos.messaging.listener.health.stat.GradientOfLoadAverageEventListener; +import org.apache.stratos.messaging.listener.health.stat.GradientOfMemoryConsumptionEventListener; +import org.apache.stratos.messaging.listener.health.stat.GradientOfRequestsInFlightEventListener; +import org.apache.stratos.messaging.listener.health.stat.MemberAverageLoadAverageEventListener; +import org.apache.stratos.messaging.listener.health.stat.MemberAverageMemoryConsumptionEventListener; +import org.apache.stratos.messaging.listener.health.stat.MemberFaultEventListener; +import org.apache.stratos.messaging.listener.health.stat.MemberGradientOfLoadAverageEventListener; +import org.apache.stratos.messaging.listener.health.stat.MemberGradientOfMemoryConsumptionEventListener; +import org.apache.stratos.messaging.listener.health.stat.MemberSecondDerivativeOfLoadAverageEventListener; +import org.apache.stratos.messaging.listener.health.stat.MemberSecondDerivativeOfMemoryConsumptionEventListener; +import org.apache.stratos.messaging.listener.health.stat.SecondDerivativeOfLoadAverageEventListener; +import org.apache.stratos.messaging.listener.health.stat.SecondDerivativeOfMemoryConsumptionEventListener; +import org.apache.stratos.messaging.listener.health.stat.SecondDerivativeOfRequestsInFlightEventListener; +import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventReceiver; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + + +/** + * A thread for processing topology messages and updating the topology data structure. + */ +public class AutoscalerHealthStatEventReceiver implements Runnable { + + private static final Log log = LogFactory.getLog(AutoscalerHealthStatEventReceiver.class); + private boolean terminated = false; + + private HealthStatEventReceiver healthStatEventReceiver; + + public AutoscalerHealthStatEventReceiver() { + this.healthStatEventReceiver = new HealthStatEventReceiver(); + addEventListeners(); + } + + @Override + public void run() { + //FIXME this activated before autoscaler deployer activated. + try { + Thread.sleep(15000); + } catch (InterruptedException ignore) { + } + Thread thread = new Thread(healthStatEventReceiver); + thread.start(); + if(log.isInfoEnabled()) { + log.info("Autoscaler health stat event receiver thread started"); + } + + // Keep the thread live until terminated + while (!terminated){ + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + if(log.isInfoEnabled()) { + log.info("Autoscaler health stat event receiver thread terminated"); + } + } + + private void addEventListeners() { + // Listen to health stat events that affect clusters + healthStatEventReceiver.addEventListener(new AverageLoadAverageEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + AverageLoadAverageEvent averageLoadAverageEvent = (AverageLoadAverageEvent) event; + String clusterId = averageLoadAverageEvent.getClusterId(); + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleAverageLoadAverageEvent(averageLoadAverageEvent); + } + + }); + healthStatEventReceiver.addEventListener(new AverageMemoryConsumptionEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + AverageMemoryConsumptionEvent averageMemoryConsumptionEvent = (AverageMemoryConsumptionEvent) event; + String clusterId = averageMemoryConsumptionEvent.getClusterId(); + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleAverageMemoryConsumptionEvent(averageMemoryConsumptionEvent); + } + }); + + healthStatEventReceiver.addEventListener(new AverageRequestsInFlightEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + AverageRequestsInFlightEvent averageRequestsInFlightEvent = (AverageRequestsInFlightEvent) event; + String clusterId = averageRequestsInFlightEvent.getClusterId(); + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleAverageRequestsInFlightEvent(averageRequestsInFlightEvent); + } + }); + + healthStatEventReceiver.addEventListener(new GradientOfLoadAverageEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + GradientOfLoadAverageEvent gradientOfLoadAverageEvent = (GradientOfLoadAverageEvent) event; + String clusterId = gradientOfLoadAverageEvent.getClusterId(); + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleGradientOfLoadAverageEvent(gradientOfLoadAverageEvent); + } + }); + + healthStatEventReceiver.addEventListener(new GradientOfMemoryConsumptionEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent = (GradientOfMemoryConsumptionEvent) event; + String clusterId = gradientOfMemoryConsumptionEvent.getClusterId(); + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleGradientOfMemoryConsumptionEvent(gradientOfMemoryConsumptionEvent); + } + }); + + healthStatEventReceiver.addEventListener(new GradientOfRequestsInFlightEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent = (GradientOfRequestsInFlightEvent) event; + String clusterId = gradientOfRequestsInFlightEvent.getClusterId(); + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleGradientOfRequestsInFlightEvent(gradientOfRequestsInFlightEvent); + } + }); + + healthStatEventReceiver.addEventListener(new MemberAverageLoadAverageEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + MemberAverageLoadAverageEvent memberAverageLoadAverageEvent = (MemberAverageLoadAverageEvent) event; + String memberId = memberAverageLoadAverageEvent.getMemberId(); + Member member = getMemberByMemberId(memberId); + if (null == member) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member not found in the Topology: [member] %s", memberId)); + } + return; + } + if (!member.isActive()) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member activated event has not received for the member %s. " + + "Therefore ignoring" + " the health stat", memberId)); + } + return; + } + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + String clusterId = member.getClusterId(); + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleMemberAverageLoadAverageEvent(memberAverageLoadAverageEvent); + } + }); + + healthStatEventReceiver.addEventListener(new MemberAverageMemoryConsumptionEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent = (MemberAverageMemoryConsumptionEvent) event; + String memberId = memberAverageMemoryConsumptionEvent.getMemberId(); + Member member = getMemberByMemberId(memberId); + if (null == member) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member not found in the Topology: [member] %s", memberId)); + } + return; + } + if (!member.isActive()) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member activated event has not received for the member %s. " + + "Therefore ignoring" + " the health stat", memberId)); + } + return; + } + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + String clusterId = member.getClusterId(); + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleMemberAverageMemoryConsumptionEvent(memberAverageMemoryConsumptionEvent); + } + }); + + healthStatEventReceiver.addEventListener(new MemberFaultEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + MemberFaultEvent memberFaultEvent = (MemberFaultEvent) event; + String clusterId = memberFaultEvent.getClusterId(); + String memberId = memberFaultEvent.getMemberId(); + if (log.isDebugEnabled()) { + log.debug(String.format("Member fault event: [member] %s ", memberId)); + } + if (memberId == null || memberId.isEmpty()) { + log.error("Member id not found in received message"); + return; + } + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleMemberFaultEvent(memberFaultEvent); + } + }); + + healthStatEventReceiver.addEventListener(new MemberGradientOfLoadAverageEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent = (MemberGradientOfLoadAverageEvent) event; + String memberId = memberGradientOfLoadAverageEvent.getMemberId(); + Member member = getMemberByMemberId(memberId); + if (null == member) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member not found in the Topology: [member] %s", memberId)); + } + return; + } + if (!member.isActive()) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member activated event has not received for the member %s. " + + "Therefore ignoring" + " the health stat", memberId)); + } + return; + } + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + String clusterId = member.getClusterId(); + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleMemberGradientOfLoadAverageEvent(memberGradientOfLoadAverageEvent); + } + }); + + healthStatEventReceiver.addEventListener(new MemberGradientOfMemoryConsumptionEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent = (MemberGradientOfMemoryConsumptionEvent) event; + String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId(); + Member member = getMemberByMemberId(memberId); + if (null == member) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member not found in the Topology: [member] %s", memberId)); + } + return; + } + if (!member.isActive()) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member activated event has not received for the member %s. " + + "Therefore ignoring" + " the health stat", memberId)); + } + return; + } + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + String clusterId = member.getClusterId(); + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleMemberGradientOfMemoryConsumptionEvent(memberGradientOfMemoryConsumptionEvent); + } + }); + + healthStatEventReceiver.addEventListener(new MemberSecondDerivativeOfLoadAverageEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent = (MemberSecondDerivativeOfLoadAverageEvent) event; + String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId(); + Member member = getMemberByMemberId(memberId); + if (null == member) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member not found in the Topology: [member] %s", memberId)); + } + return; + } + if (!member.isActive()) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member activated event has not received for the member %s. " + + "Therefore ignoring" + " the health stat", memberId)); + } + return; + } + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + String clusterId = member.getClusterId(); + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleMemberSecondDerivativeOfLoadAverageEvent(memberSecondDerivativeOfLoadAverageEvent); + } + }); + + healthStatEventReceiver.addEventListener(new MemberSecondDerivativeOfMemoryConsumptionEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + + } + }); + + healthStatEventReceiver.addEventListener(new SecondDerivativeOfLoadAverageEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent = (SecondDerivativeOfLoadAverageEvent) event; + String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId(); + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleSecondDerivativeOfLoadAverageEvent(secondDerivativeOfLoadAverageEvent); + } + }); + + healthStatEventReceiver.addEventListener(new SecondDerivativeOfMemoryConsumptionEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent = (SecondDerivativeOfMemoryConsumptionEvent) event; + String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId(); + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleSecondDerivativeOfMemoryConsumptionEvent(secondDerivativeOfMemoryConsumptionEvent); + } + }); + + healthStatEventReceiver.addEventListener(new SecondDerivativeOfRequestsInFlightEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent = (SecondDerivativeOfRequestsInFlightEvent) event; + String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId(); + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleSecondDerivativeOfRequestsInFlightEvent(secondDerivativeOfRequestsInFlightEvent); + } + }); + } + + private Member getMemberByMemberId(String memberId) { + try { + TopologyManager.acquireReadLock(); + for (Service service : TopologyManager.getTopology().getServices()) { + for (Cluster cluster : service.getClusters()) { + if (cluster.memberExists(memberId)) { + return cluster.getMember(memberId); + } + } + } + return null; + } finally { + TopologyManager.releaseReadLock(); + } + } + + public void terminate() { + this.terminated = true; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/ed5feced/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java new file mode 100644 index 0000000..f01f8f9 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.autoscaler.event.receiver.topology; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.AutoscalerContext; +import org.apache.stratos.autoscaler.NetworkPartitionContext; +import org.apache.stratos.autoscaler.applications.ApplicationHolder; +import org.apache.stratos.autoscaler.exception.DependencyBuilderException; +import org.apache.stratos.autoscaler.exception.TopologyInConsistentException; +import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher; +import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor; +import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitorFactory; +import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor; +import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor; +import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; +import org.apache.stratos.messaging.domain.applications.Application; +import org.apache.stratos.messaging.domain.applications.Applications; +import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; +import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.topology.*; +import org.apache.stratos.messaging.listener.topology.*; +import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import org.drools.runtime.StatefulKnowledgeSession; +import org.drools.runtime.rule.FactHandle; + +/** + * Autoscaler topology receiver. + */ +public class AutoscalerTopologyEventReceiver implements Runnable { + + private static final Log log = LogFactory.getLog(AutoscalerTopologyEventReceiver.class); + + private TopologyEventReceiver topologyEventReceiver; + private boolean terminated; + private boolean topologyInitialized; + + public AutoscalerTopologyEventReceiver() { + this.topologyEventReceiver = new TopologyEventReceiver(); + addEventListeners(); + } + + @Override + public void run() { + //FIXME this activated before autoscaler deployer activated. + try { + Thread.sleep(15000); + } catch (InterruptedException ignore) { + } + Thread thread = new Thread(topologyEventReceiver); + thread.start(); + if (log.isInfoEnabled()) { + log.info("Autoscaler topology receiver thread started"); + } + + // Keep the thread live until terminated + while (!terminated) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + if (log.isInfoEnabled()) { + log.info("Autoscaler topology receiver thread terminated"); + } + } + + private boolean allClustersInitialized(Application application) { + boolean allClustersInitialized = false; + for (ClusterDataHolder holder : application.getClusterDataMap().values()) { + TopologyManager.acquireReadLockForCluster(holder.getServiceType(), + holder.getClusterId()); + + try { + Topology topology = TopologyManager.getTopology(); + if (topology != null) { + Service service = topology.getService(holder.getServiceType()); + if (service != null) { + if (service.clusterExists(holder.getClusterId())) { + allClustersInitialized = true; + } else { + if (log.isDebugEnabled()) { + log.debug("[Cluster] " + holder.getClusterId() + " is not found in " + + "the Topology"); + } + allClustersInitialized = false; + return allClustersInitialized; + } + } else { + if (log.isDebugEnabled()) { + log.debug("Service is null in the CompleteTopologyEvent"); + } + } + } else { + if (log.isDebugEnabled()) { + log.debug("Topology is null in the CompleteTopologyEvent"); + } + } + } finally { + TopologyManager.releaseReadLockForCluster(holder.getServiceType(), + holder.getClusterId()); + } + } + return allClustersInitialized; + } + + + private void addEventListeners() { + // Listen to topology events that affect clusters + topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { + @Override + protected void onEvent(Event event) { + if (!topologyInitialized) { + log.info("[CompleteTopologyEvent] Received: " + event.getClass()); + ApplicationHolder.acquireReadLock(); + try { + Applications applications = ApplicationHolder.getApplications(); + if (applications != null) { + for (Application application : applications.getApplications().values()) { + if (allClustersInitialized(application)) { + startApplicationMonitor(application.getUniqueIdentifier()); + } else { + log.error("Complete Topology is not consistent with the applications " + + "which got persisted"); + } + } + topologyInitialized = true; + } else { + log.info("No applications found in the complete topology"); + } + } catch (Exception e) { + log.error("Error processing event", e); + } finally { + ApplicationHolder.releaseReadLock(); + } + } + } + }); + + + topologyEventReceiver.addEventListener(new ApplicationClustersCreatedEventListener() { + @Override + protected void onEvent(Event event) { + try { + log.info("[ApplicationClustersCreatedEvent] Received: " + event.getClass()); + ApplicationClustersCreatedEvent applicationClustersCreatedEvent = + (ApplicationClustersCreatedEvent) event; + String appId = applicationClustersCreatedEvent.getAppId(); + try { + //acquire read lock + ApplicationHolder.acquireReadLock(); + //start the application monitor + startApplicationMonitor(appId); + } catch (Exception e) { + String msg = "Error processing event " + e.getLocalizedMessage(); + log.error(msg, e); + } finally { + //release read lock + ApplicationHolder.releaseReadLock(); + + } + } catch (ClassCastException e) { + String msg = "Error while casting the event " + e.getLocalizedMessage(); + log.error(msg, e); + } + + } + }); + + topologyEventReceiver.addEventListener(new ClusterActivatedEventListener() { + @Override + protected void onEvent(Event event) { + log.info("[ClusterActivatedEvent] Received: " + event.getClass()); + + ClusterActivatedEvent clusterActivatedEvent = (ClusterActivatedEvent) event; + String clusterId = clusterActivatedEvent.getClusterId(); + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + //changing the status in the monitor, will notify its parent monitor + monitor.setStatus(ClusterStatus.Active); + } + }); + + topologyEventReceiver.addEventListener(new ClusterResetEventListener() { + @Override + protected void onEvent(Event event) { + + log.info("[ClusterCreatedEvent] Received: " + event.getClass()); + + ClusterCreatedEvent clusterCreatedEvent = (ClusterCreatedEvent) event; + String clusterId = clusterCreatedEvent.getCluster().getClusterId(); + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + + //changing the status in the monitor, will notify its parent monitor + monitor.setStop(true); + monitor.setStatus(ClusterStatus.Created); + + } + }); + + topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() { + @Override + protected void onEvent(Event event) { + log.info("[ClusterCreatedEvent] Received: " + event.getClass()); + } + }); + + topologyEventReceiver.addEventListener(new ClusterInActivateEventListener() { + @Override + protected void onEvent(Event event) { + log.info("[ClusterInActivateEvent] Received: " + event.getClass()); + + ClusterInactivateEvent clusterInactivateEvent = (ClusterInactivateEvent) event; + String clusterId = clusterInactivateEvent.getClusterId(); + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + //changing the status in the monitor, will notify its parent monitor + monitor.setStatus(ClusterStatus.Inactive); + } + }); + + topologyEventReceiver.addEventListener(new ClusterTerminatingEventListener() { + @Override + protected void onEvent(Event event) { + + log.info("[ClusterTerminatingEvent] Received: " + event.getClass()); + + ClusterTerminatingEvent clusterTerminatingEvent = (ClusterTerminatingEvent) event; + String clusterId = clusterTerminatingEvent.getClusterId(); + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + //changing the status in the monitor, will notify its parent monitor + if (monitor.getStatus() == ClusterStatus.Active) { + // terminated gracefully + monitor.setStatus(ClusterStatus.Terminating); + InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId); + } else { + monitor.setStatus(ClusterStatus.Terminating); + monitor.terminateAllMembers(); + } + } + }); + + topologyEventReceiver.addEventListener(new ClusterTerminatedEventListener() { + @Override + protected void onEvent(Event event) { + log.info("[ClusterTerminatedEvent] Received: " + event.getClass()); + + ClusterTerminatedEvent clusterTerminatedEvent = (ClusterTerminatedEvent) event; + String clusterId = clusterTerminatedEvent.getClusterId(); + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + + //changing the status in the monitor, will notify its parent monitor + monitor.setStatus(ClusterStatus.Terminated); + //Destroying and Removing the Cluster monitor + monitor.destroy(); + AutoscalerContext.getInstance().removeClusterMonitor(clusterId); + } + }); + + topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() { + @Override + protected void onEvent(Event event) { + try { + MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event; + String clusterId = memberReadyToShutdownEvent.getClusterId(); + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent); + } catch (Exception e) { + String msg = "Error processing event " + e.getLocalizedMessage(); + log.error(msg, e); + } + } + }); + + + topologyEventReceiver.addEventListener(new MemberStartedEventListener() { + @Override + protected void onEvent(Event event) { + + } + + }); + + topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() { + @Override + protected void onEvent(Event event) { + try { + MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; + String clusterId = memberTerminatedEvent.getClusterId(); + AbstractClusterMonitor monitor; + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleMemberTerminatedEvent(memberTerminatedEvent); + } catch (Exception e) { + String msg = "Error processing event " + e.getLocalizedMessage(); + log.error(msg, e); + } + } + }); + + topologyEventReceiver.addEventListener(new MemberActivatedEventListener() { + @Override + protected void onEvent(Event event) { + try { + MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; + String clusterId = memberActivatedEvent.getClusterId(); + AbstractClusterMonitor monitor; + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleMemberActivatedEvent(memberActivatedEvent); + } catch (Exception e) { + String msg = "Error processing event " + e.getLocalizedMessage(); + log.error(msg, e); + } + } + }); + + topologyEventReceiver.addEventListener(new MemberMaintenanceListener() { + @Override + protected void onEvent(Event event) { + try { + MemberMaintenanceModeEvent maintenanceModeEvent = (MemberMaintenanceModeEvent) event; + String clusterId = maintenanceModeEvent.getClusterId(); + AbstractClusterMonitor monitor; + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); + } + return; + } + monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent); + } catch (Exception e) { + String msg = "Error processing event " + e.getLocalizedMessage(); + log.error(msg, e); + } + } + }); + } + + @SuppressWarnings("unused") + private void runTerminateAllRule(VMClusterMonitor monitor) { + + FactHandle terminateAllFactHandle = null; + + StatefulKnowledgeSession terminateAllKnowledgeSession = null; + + for (NetworkPartitionContext networkPartitionContext : monitor.getNetworkPartitionCtxts().values()) { + terminateAllFactHandle = AutoscalerRuleEvaluator.evaluateTerminateAll(terminateAllKnowledgeSession + , terminateAllFactHandle, networkPartitionContext); + } + + } + + /** + * Terminate load balancer topology receiver thread. + */ + public void terminate() { + topologyEventReceiver.terminate(); + terminated = true; + } + + protected synchronized void startApplicationMonitor(String applicationId) { + Thread th = null; + if (!AutoscalerContext.getInstance().appMonitorExist(applicationId)) { + th = new Thread( + new ApplicationMonitorAdder(applicationId)); + } + + if (th != null) { + th.start(); + // try { + // th.join(); + // } catch (InterruptedException ignore) { + + if (log.isDebugEnabled()) { + log.debug(String + .format("Application monitor thread has been started successfully: " + + "[application] %s ", applicationId)); + } + } else { + if (log.isDebugEnabled()) { + log.debug(String + .format("Application monitor thread already exists: " + + "[application] %s ", applicationId)); + } + } + } + + private class ApplicationMonitorAdder implements Runnable { + private String appId; + + public ApplicationMonitorAdder(String appId) { + this.appId = appId; + } + + public void run() { + ApplicationMonitor applicationMonitor = null; + int retries = 5; + boolean success = false; + do { + try { + Thread.sleep(5000); + } catch (InterruptedException e1) { + } + try { + long start = System.currentTimeMillis(); + if (log.isDebugEnabled()) { + log.debug("application monitor is going to be started for [application] " + + appId); + } + applicationMonitor = ApplicationMonitorFactory.getApplicationMonitor(appId); + + long end = System.currentTimeMillis(); + log.info("Time taken to start app monitor: " + (end - start) / 1000); + success = true; + } catch (DependencyBuilderException e) { + String msg = "Application monitor creation failed for Application: "; + log.warn(msg, e); + retries--; + } catch (TopologyInConsistentException e) { + String msg = "Application monitor creation failed for Application: "; + log.warn(msg, e); + retries--; + } + } while (!success && retries != 0); + + if (applicationMonitor == null) { + String msg = "Application monitor creation failed, even after retrying for 5 times, " + + "for Application: " + appId; + log.error(msg); + throw new RuntimeException(msg); + } + + AutoscalerContext.getInstance().addAppMonitor(applicationMonitor); + + if (log.isInfoEnabled()) { + log.info(String.format("Application monitor has been added successfully: " + + "[application] %s", applicationMonitor.getId())); + } + } + } + + +} http://git-wip-us.apache.org/repos/asf/stratos/blob/ed5feced/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/ClusterStatusEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/ClusterStatusEventPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/ClusterStatusEventPublisher.java deleted file mode 100644 index 631a999..0000000 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/ClusterStatusEventPublisher.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.autoscaler.grouping.topic; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.broker.publish.EventPublisher; -import org.apache.stratos.messaging.broker.publish.EventPublisherPool; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.ClusterStatus; -import org.apache.stratos.messaging.domain.topology.Service; -import org.apache.stratos.messaging.event.Event; -import org.apache.stratos.messaging.event.cluster.status.*; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; -import org.apache.stratos.messaging.util.Constants; - -/** - * This will publish cluster status events to cluster-status topic - */ -public class ClusterStatusEventPublisher { - private static final Log log = LogFactory.getLog(ClusterStatusEventPublisher.class); - - - public static void sendClusterCreatedEvent(String appId, String serviceName, String clusterId) { - try { - TopologyManager.acquireReadLockForCluster(serviceName, clusterId); - Service service = TopologyManager.getTopology().getService(serviceName); - if (service != null) { - Cluster cluster = service.getCluster(clusterId); - if (cluster.isStateTransitionValid(ClusterStatus.Created)) { - if (log.isInfoEnabled()) { - log.info("Publishing Cluster created event for [application]: " + appId + - " [cluster]: " + clusterId); - } - /*ClusterStatusClusterCreatedEvent clusterCreatedEvent = - new ClusterStatusClusterCreatedEvent(appId, serviceName, clusterId); - - publishEvent(clusterCreatedEvent);*/ - } else { - log.warn("Created is not in the possible state list of [cluster] " + clusterId); - } - } - } finally { - TopologyManager.releaseReadLockForCluster(serviceName, clusterId); - } - } - - public static void sendClusterResetEvent(String appId, String serviceName, String clusterId) { - try { - TopologyManager.acquireReadLockForCluster(serviceName, clusterId); - Service service = TopologyManager.getTopology().getService(serviceName); - if (service != null) { - Cluster cluster = service.getCluster(clusterId); - if (cluster.isStateTransitionValid(ClusterStatus.Created)) { - if (log.isInfoEnabled()) { - log.info("Publishing Cluster created event for [application]: " + appId + - " [cluster]: " + clusterId); - } - ClusterStatusClusterResetEvent clusterCreatedEvent = - new ClusterStatusClusterResetEvent(appId, serviceName, clusterId); - - publishEvent(clusterCreatedEvent); - } else { - log.warn("Created is not in the possible state list of [cluster] " + clusterId); - } - } - } finally { - TopologyManager.releaseReadLockForCluster(serviceName, clusterId); - } - } - - public static void sendClusterActivatedEvent(String appId, String serviceName, String clusterId) { - try { - TopologyManager.acquireReadLockForCluster(serviceName, clusterId); - Service service = TopologyManager.getTopology().getService(serviceName); - if (service != null) { - Cluster cluster = service.getCluster(clusterId); - if (cluster.isStateTransitionValid(ClusterStatus.Active)) { - if (log.isInfoEnabled()) { - log.info("Publishing Cluster activated event for [application]: " + appId + - " [cluster]: " + clusterId); - } - ClusterStatusClusterActivatedEvent clusterActivatedEvent = - new ClusterStatusClusterActivatedEvent(appId, serviceName, clusterId); - - publishEvent(clusterActivatedEvent); - } else { - log.warn("Active is not in the possible state list of [cluster] " + clusterId); - } - } - } finally { - TopologyManager.releaseReadLockForCluster(serviceName, clusterId); - } - } - - public static void sendClusterInActivateEvent(String appId, String serviceName, String clusterId) { - try { - TopologyManager.acquireReadLockForCluster(serviceName, clusterId); - Service service = TopologyManager.getTopology().getService(serviceName); - if (service != null) { - Cluster cluster = service.getCluster(clusterId); - if (cluster.isStateTransitionValid(ClusterStatus.Inactive)) { - if (log.isInfoEnabled()) { - log.info("Publishing Cluster in-activate event for [application]: " + appId + - " [cluster]: " + clusterId); - } - ClusterStatusClusterInactivateEvent clusterInActivateEvent = - new ClusterStatusClusterInactivateEvent(appId, serviceName, clusterId); - - publishEvent(clusterInActivateEvent); - } else { - log.warn("In-active is not in the possible state list of [cluster] " + clusterId); - } - } - } finally { - TopologyManager.releaseReadLockForCluster(serviceName, clusterId); - - } - } - - public static void sendClusterTerminatingEvent(String appId, String serviceName, String clusterId) { - - try { - TopologyManager.acquireReadLockForCluster(serviceName, clusterId); - Service service = TopologyManager.getTopology().getService(serviceName); - if (service != null) { - Cluster cluster = service.getCluster(clusterId); - if (cluster.isStateTransitionValid(ClusterStatus.Terminating)) { - if (log.isInfoEnabled()) { - log.info("Publishing Cluster Terminating event for [application]: " + appId + - " [cluster]: " + clusterId); - } - ClusterStatusClusterTerminatingEvent appStatusClusterTerminatingEvent = - new ClusterStatusClusterTerminatingEvent(appId, serviceName, clusterId); - - publishEvent(appStatusClusterTerminatingEvent); - } else { - log.warn("Terminating is not in the possible state list of [cluster] " + clusterId); - } - } - } finally { - TopologyManager.releaseReadLockForCluster(serviceName, clusterId); - - } - - } - - public static void sendClusterTerminatedEvent(String appId, String serviceName, String clusterId) { - try { - TopologyManager.acquireReadLockForCluster(serviceName, clusterId); - Service service = TopologyManager.getTopology().getService(serviceName); - if (service != null) { - Cluster cluster = service.getCluster(clusterId); - if (cluster.isStateTransitionValid(ClusterStatus.Terminated)) { - if (log.isInfoEnabled()) { - log.info("Publishing Cluster terminated event for [application]: " + appId + - " [cluster]: " + clusterId); - } - ClusterStatusClusterTerminatedEvent appStatusClusterTerminatedEvent = - new ClusterStatusClusterTerminatedEvent(appId, serviceName, clusterId); - - publishEvent(appStatusClusterTerminatedEvent); - } else { - log.warn("Terminated is not in the possible state list of [cluster] " + clusterId); - } - } - } finally { - TopologyManager.releaseReadLockForCluster(serviceName, clusterId); - - } - } - - - public static void publishEvent(Event event) { - //publishing events to application status topic - EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.CLUSTER_STATUS_TOPIC); - eventPublisher.publish(event); - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/ed5feced/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/InstanceNotificationPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/InstanceNotificationPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/InstanceNotificationPublisher.java deleted file mode 100644 index 80fa295..0000000 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/InstanceNotificationPublisher.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.autoscaler.grouping.topic; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.broker.publish.EventPublisher; -import org.apache.stratos.messaging.broker.publish.EventPublisherPool; -import org.apache.stratos.messaging.event.Event; -import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupClusterEvent; -import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent; -import org.apache.stratos.messaging.util.Constants; - -public class InstanceNotificationPublisher { - private static final Log log = LogFactory.getLog(InstanceNotificationPublisher.class); - - private static void publish(Event event) { - EventPublisher instanceNotifyingEvent = EventPublisherPool.getPublisher(Constants.INSTANCE_NOTIFIER_TOPIC); - instanceNotifyingEvent.publish(event); - } - - public static void sendInstanceCleanupEventForCluster(String clusterId) { - log.info(String.format("Publishing Instance Cleanup Event: [cluster] %s", clusterId)); - publish(new InstanceCleanupClusterEvent(clusterId)); - } - - /** - * Publishing the instance termination notification to the instances - * - * @param memberId - */ - public void sendInstanceCleanupEventForMember(String memberId) { - log.info(String.format("Publishing Instance Cleanup Event: [member] %s", memberId)); - publish(new InstanceCleanupMemberEvent(memberId)); - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/ed5feced/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java index 203d6e0..3da60ab 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java @@ -25,8 +25,8 @@ import org.apache.stratos.autoscaler.applications.ApplicationSynchronizerTaskSch import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; import org.apache.stratos.autoscaler.exception.AutoScalerException; import org.apache.stratos.autoscaler.kubernetes.KubernetesManager; -import org.apache.stratos.autoscaler.message.receiver.health.AutoscalerHealthStatEventReceiver; -import org.apache.stratos.autoscaler.message.receiver.topology.AutoscalerTopologyEventReceiver; +import org.apache.stratos.autoscaler.event.receiver.health.AutoscalerHealthStatEventReceiver; +import org.apache.stratos.autoscaler.event.receiver.topology.AutoscalerTopologyEventReceiver; import org.apache.stratos.autoscaler.partition.PartitionManager; import org.apache.stratos.autoscaler.policy.PolicyManager; import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
