Added null check for appMonitor object in ClusterInstanceTerminatedEventListener. Fixed formatting and log messages.
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/d0f0f81b Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/d0f0f81b Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/d0f0f81b Branch: refs/heads/stratos-4.1.x Commit: d0f0f81bd8a6a142eb4eeda36276d3dac9ff44c5 Parents: d587011 Author: Akila Perera <[email protected]> Authored: Mon Nov 30 00:12:43 2015 +0530 Committer: Akila Perera <[email protected]> Committed: Mon Nov 30 00:33:46 2015 +0530 ---------------------------------------------------------------------- .../AutoscalerTopologyEventReceiver.java | 277 +++++++++---------- 1 file changed, 130 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/d0f0f81b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java index 500b95a..8336f86 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -35,8 +35,6 @@ import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor; import org.apache.stratos.autoscaler.monitor.events.ClusterStatusEvent; import org.apache.stratos.autoscaler.util.AutoscalerUtil; import org.apache.stratos.autoscaler.util.ServiceReferenceHolder; -import org.apache.stratos.messaging.broker.publish.EventPublisher; -import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.domain.application.Application; import org.apache.stratos.messaging.domain.application.Applications; import org.apache.stratos.messaging.domain.instance.ClusterInstance; @@ -44,12 +42,10 @@ import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.ClusterStatus; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.event.Event; -import org.apache.stratos.messaging.event.initializer.CompleteTopologyRequestEvent; import org.apache.stratos.messaging.event.topology.*; import org.apache.stratos.messaging.listener.topology.*; import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; -import org.apache.stratos.messaging.util.MessagingUtil; import java.util.concurrent.ExecutorService; @@ -57,9 +53,7 @@ import java.util.concurrent.ExecutorService; * Autoscaler topology receiver. */ public class AutoscalerTopologyEventReceiver { - private static final Log log = LogFactory.getLog(AutoscalerTopologyEventReceiver.class); - private TopologyEventReceiver topologyEventReceiver; private boolean terminated; private boolean topologyInitialized; @@ -72,10 +66,8 @@ public class AutoscalerTopologyEventReceiver { public void execute() { //FIXME this activated before autoscaler deployer activated. - topologyEventReceiver.setExecutorService(getExecutorService()); topologyEventReceiver.execute(); - if (log.isInfoEnabled()) { log.info("Autoscaler topology receiver thread started"); } @@ -87,29 +79,27 @@ public class AutoscalerTopologyEventReceiver { @Override protected void onEvent(Event event) { if (!topologyInitialized) { - log.info("[CompleteTopologyEvent] Received: " + event.getClass()); + log.info("[CompleteTopologyEvent] received: " + event.getClass()); try { ApplicationHolder.acquireReadLock(); Applications applications = ApplicationHolder.getApplications(); if (applications != null) { for (Application application : applications. getApplications().values()) { - ApplicationContext applicationContext = - AutoscalerContext.getInstance(). - getApplicationContext(application.getUniqueIdentifier()); + ApplicationContext applicationContext = AutoscalerContext.getInstance(). + getApplicationContext(application.getUniqueIdentifier()); if (applicationContext != null && applicationContext.getStatus(). equals(ApplicationContext.STATUS_DEPLOYED)) { if (AutoscalerUtil.allClustersInitialized(application)) { - AutoscalerUtil.getInstance().startApplicationMonitor( - application.getUniqueIdentifier()); + AutoscalerUtil.getInstance() + .startApplicationMonitor(application.getUniqueIdentifier()); } else { - log.error("Complete Topology is not consistent with " + - "the applications which got persisted"); + log.error("Complete Topology is not consistent with the applications which got " + + "persisted"); } } else { - log.info("The application is not yet " + - "deployed for this [application] " + - application.getUniqueIdentifier()); + log.info("The application is not yet deployed for this [application] " + application + .getUniqueIdentifier()); } } @@ -126,14 +116,13 @@ public class AutoscalerTopologyEventReceiver { } }); - topologyEventReceiver.addEventListener(new ApplicationClustersCreatedEventListener() { @Override protected void onEvent(Event event) { try { - log.info("[ApplicationClustersCreatedEvent] Received: " + event.getClass()); - ApplicationClustersCreatedEvent applicationClustersCreatedEvent = - (ApplicationClustersCreatedEvent) event; + log.info("[ApplicationClustersCreatedEvent] received: " + event.getClass()); + ApplicationClustersCreatedEvent applicationClustersCreatedEvent + = (ApplicationClustersCreatedEvent) event; String appId = applicationClustersCreatedEvent.getAppId(); try { //acquire read lock @@ -141,28 +130,26 @@ public class AutoscalerTopologyEventReceiver { //start the application monitor ApplicationContext applicationContext = AutoscalerContext.getInstance(). getApplicationContext(appId); - if (applicationContext != null && - applicationContext.getStatus(). - equals(ApplicationContext.STATUS_DEPLOYED)) { + if (applicationContext != null && applicationContext.getStatus(). + equals(ApplicationContext.STATUS_DEPLOYED)) { if (!AutoscalerContext.getInstance(). containsApplicationPendingMonitor(appId)) { AutoscalerUtil.getInstance().startApplicationMonitor(appId); } } else { String status; - if(applicationContext == null) { + if (applicationContext == null) { status = null; } else { status = applicationContext.getStatus(); } - log.error("Error while creating the application monitor due to " + - "in-consistent persistence of [application] " + - applicationClustersCreatedEvent.getAppId() + ", " + - "the [application-context] " + applicationContext + - " status of [application-context] " + status); + log.error(String.format( + "Error while creating the application monitor due to inconsistent persistence of " + + "[application] %s, [application-context] %s, [status] %s", + applicationClustersCreatedEvent.getAppId(), applicationContext, status)); } } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); + String msg = "Error processing ApplicationClustersCreatedEvent: " + e.getLocalizedMessage(); log.error(msg, e); } finally { //release read lock @@ -179,7 +166,7 @@ public class AutoscalerTopologyEventReceiver { topologyEventReceiver.addEventListener(new ClusterInstanceActivatedEventListener() { @Override protected void onEvent(Event event) { - log.info("[ClusterActivatedEvent] Received: " + event.getClass()); + log.info("[ClusterActivatedEvent] received: " + event.getClass()); ClusterInstanceActivatedEvent clusterActivatedEvent = (ClusterInstanceActivatedEvent) event; String clusterId = clusterActivatedEvent.getClusterId(); String instanceId = clusterActivatedEvent.getInstanceId(); @@ -188,8 +175,8 @@ public class AutoscalerTopologyEventReceiver { monitor = asCtx.getClusterMonitor(clusterId); if (null == monitor) { if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + log.debug(String.format("Cluster monitor is not found in autoscaler context [cluster-id] %s", + clusterId)); } return; } @@ -202,7 +189,7 @@ public class AutoscalerTopologyEventReceiver { topologyEventReceiver.addEventListener(new ClusterResetEventListener() { @Override protected void onEvent(Event event) { - log.info("[ClusterCreatedEvent] Received: " + event.getClass()); + log.info("[ClusterCreatedEvent] received: " + event.getClass()); ClusterResetEvent clusterResetEvent = (ClusterResetEvent) event; String clusterId = clusterResetEvent.getClusterId(); String instanceId = clusterResetEvent.getInstanceId(); @@ -211,8 +198,8 @@ public class AutoscalerTopologyEventReceiver { monitor = asCtx.getClusterMonitor(clusterId); if (null == monitor) { if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + log.debug(String.format("Cluster monitor is not found in autoscaler context [cluster-id] %s", + clusterId)); } return; } @@ -226,14 +213,14 @@ public class AutoscalerTopologyEventReceiver { topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() { @Override protected void onEvent(Event event) { - log.info("[ClusterCreatedEvent] Received: " + event.getClass()); + log.info("[ClusterCreatedEvent] received: " + event.getClass()); } }); topologyEventReceiver.addEventListener(new ClusterInstanceInactivateEventListener() { @Override protected void onEvent(Event event) { - log.info("[ClusterInactivateEvent] Received: " + event.getClass()); + log.info("[ClusterInactivateEvent] received: " + event.getClass()); ClusterInstanceInactivateEvent clusterInactivateEvent = (ClusterInstanceInactivateEvent) event; String clusterId = clusterInactivateEvent.getClusterId(); String instanceId = clusterInactivateEvent.getInstanceId(); @@ -242,8 +229,8 @@ public class AutoscalerTopologyEventReceiver { monitor = asCtx.getClusterMonitor(clusterId); if (null == monitor) { if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + log.debug(String.format("Cluster monitor is not found in autoscaler context " + "[cluster] %s", + clusterId)); } return; } @@ -255,7 +242,7 @@ public class AutoscalerTopologyEventReceiver { topologyEventReceiver.addEventListener(new ClusterInstanceTerminatingEventListener() { @Override protected void onEvent(Event event) { - log.info("[ClusterTerminatingEvent] Received: " + event.getClass()); + log.info("[ClusterTerminatingEvent] received: " + event.getClass()); ClusterInstanceTerminatingEvent clusterTerminatingEvent = (ClusterInstanceTerminatingEvent) event; String clusterId = clusterTerminatingEvent.getClusterId(); String clusterInstanceId = clusterTerminatingEvent.getInstanceId(); @@ -264,8 +251,8 @@ public class AutoscalerTopologyEventReceiver { monitor = asCtx.getClusterMonitor(clusterId); if (null == monitor) { if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + log.debug(String.format("Cluster monitor is not found in autoscaler context " + "[cluster] %s", + clusterId)); } // if monitor does not exist, send cluster terminated event ClusterStatusEventPublisher.sendClusterTerminatedEvent(clusterTerminatingEvent.getAppId(), @@ -280,11 +267,9 @@ public class AutoscalerTopologyEventReceiver { InstanceNotificationPublisher.getInstance(). sendInstanceCleanupEventForCluster(clusterId, clusterInstanceId); //Terminating the pending members - monitor.terminatePendingMembers(clusterInstanceId, - clusterInstance.getNetworkPartitionId()); + monitor.terminatePendingMembers(clusterInstanceId, clusterInstance.getNetworkPartitionId()); //Move all members to terminating pending list - monitor.moveMembersToTerminatingPending(clusterInstanceId, - clusterInstance.getNetworkPartitionId()); + monitor.moveMembersToTerminatingPending(clusterInstanceId, clusterInstance.getNetworkPartitionId()); } else { monitor.notifyParentMonitor(ClusterStatus.Terminating, clusterInstanceId); monitor.terminateAllMembers(clusterInstanceId, clusterInstance.getNetworkPartitionId()); @@ -297,7 +282,7 @@ public class AutoscalerTopologyEventReceiver { topologyEventReceiver.addEventListener(new ClusterInstanceTerminatedEventListener() { @Override protected void onEvent(Event event) { - log.info("[ClusterTerminatedEvent] Received: " + event.getClass()); + log.info("[ClusterTerminatedEvent] received: " + event.getClass()); ClusterInstanceTerminatedEvent clusterTerminatedEvent = (ClusterInstanceTerminatedEvent) event; String clusterId = clusterTerminatedEvent.getClusterId(); String instanceId = clusterTerminatedEvent.getInstanceId(); @@ -309,16 +294,15 @@ public class AutoscalerTopologyEventReceiver { getAppMonitor(clusterTerminatedEvent.getAppId()); if (null == monitor) { if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + log.debug(String.format("Cluster monitor is not found in autoscaler context [cluster] %s", + clusterId)); } - // if the cluster monitor is null, assume that its termianted + // if the cluster monitor is null, assume that it is terminated appMonitor = AutoscalerContext.getInstance(). getAppMonitor(clusterTerminatedEvent.getAppId()); if (appMonitor != null && !appMonitor.isForce()) { appMonitor.onChildStatusEvent( - new ClusterStatusEvent(ClusterStatus.Terminated, - clusterId, instanceId)); + new ClusterStatusEvent(ClusterStatus.Terminated, clusterId, instanceId)); } return; } @@ -330,7 +314,7 @@ public class AutoscalerTopologyEventReceiver { getNetworkPartitionCtxt(instance.getNetworkPartitionId()). removeInstanceContext(instanceId); monitor.removeInstance(instanceId); - if (!monitor.hasInstance() && appMonitor.isTerminating()) { + if (!monitor.hasInstance() && (appMonitor != null && appMonitor.isTerminating())) { //Destroying and Removing the Cluster monitor monitor.destroy(); AutoscalerContext.getInstance().removeClusterMonitor(clusterId); @@ -343,7 +327,7 @@ public class AutoscalerTopologyEventReceiver { @Override protected void onEvent(Event event) { try { - log.info("[MemberReadyToShutdownEvent] Received: " + event.getClass()); + log.info("[MemberReadyToShutdownEvent] received: " + event.getClass()); MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event; String clusterId = memberReadyToShutdownEvent.getClusterId(); AutoscalerContext asCtx = AutoscalerContext.getInstance(); @@ -351,20 +335,19 @@ public class AutoscalerTopologyEventReceiver { monitor = asCtx.getClusterMonitor(clusterId); if (null == monitor) { if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + log.debug(String.format( + "Cluster monitor is not found in autoscaler context " + "[cluster] %s", clusterId)); } return; } monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent); } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); + String msg = "Error processing MemberReadyToShutdownEvent: " + e.getLocalizedMessage(); log.error(msg, e); } } }); - topologyEventReceiver.addEventListener(new MemberStartedEventListener() { @Override protected void onEvent(Event event) { @@ -383,14 +366,14 @@ public class AutoscalerTopologyEventReceiver { monitor = asCtx.getClusterMonitor(clusterId); if (null == monitor) { if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + log.debug(String.format( + "Cluster monitor is not found in autoscaler context " + "[cluster] %s", clusterId)); } return; } monitor.handleMemberTerminatedEvent(memberTerminatedEvent); } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); + String msg = "Error processing MemberTerminatedEvent: " + e.getLocalizedMessage(); log.error(msg, e); } } @@ -408,14 +391,14 @@ public class AutoscalerTopologyEventReceiver { monitor = asCtx.getClusterMonitor(clusterId); if (null == monitor) { if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + log.debug(String.format( + "Cluster monitor is not found in autoscaler context " + "[cluster] %s", clusterId)); } return; } monitor.handleMemberActivatedEvent(memberActivatedEvent); } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); + String msg = "Error processing MemberActivatedEvent: " + e.getLocalizedMessage(); log.error(msg, e); } } @@ -432,101 +415,101 @@ public class AutoscalerTopologyEventReceiver { monitor = asCtx.getClusterMonitor(clusterId); if (null == monitor) { if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + log.debug(String.format("Cluster monitor is not found in autoscaler context [cluster] %s", + clusterId)); } return; } monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent); } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); + String msg = "Error processing MemberMaintenanceModeEvent: " + e.getLocalizedMessage(); log.error(msg, e); } } }); topologyEventReceiver.addEventListener(new ClusterInstanceCreatedEventListener() { - @Override - protected void onEvent(Event event) { - - ClusterInstanceCreatedEvent clusterInstanceCreatedEvent = - (ClusterInstanceCreatedEvent) event; - ClusterMonitor clusterMonitor = AutoscalerContext.getInstance(). - getClusterMonitor(clusterInstanceCreatedEvent.getClusterId()); - ClusterInstance clusterInstance = ((ClusterInstanceCreatedEvent) event). - getClusterInstance(); - String instanceId = clusterInstance.getInstanceId(); - //FIXME to take lock when clusterMonitor is running - if (clusterMonitor != null) { - TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(), - clusterInstanceCreatedEvent.getClusterId()); - - try { - Service service = TopologyManager.getTopology(). - getService(clusterInstanceCreatedEvent.getServiceName()); - - if (service != null) { - Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId()); - if (cluster != null) { - try { - ClusterContext clusterContext = - (ClusterContext) clusterMonitor.getClusterContext(); - if (clusterContext == null) { - clusterContext = ClusterContextFactory.getVMClusterContext(instanceId, cluster, - clusterMonitor.hasScalingDependents(), clusterMonitor.getDeploymentPolicyId()); - clusterMonitor.setClusterContext(clusterContext); - - } - log.info(" Cluster monitor has scaling dependents" - + " [" + clusterMonitor.hasScalingDependents() + "] "); // TODO -- remove this log.. - clusterContext.addInstanceContext(instanceId, cluster, - clusterMonitor.hasScalingDependents(), clusterMonitor.groupScalingEnabledSubtree()); - if (clusterMonitor.getInstance(instanceId) == null) { - // adding the same instance in topology to monitor as a reference - ClusterInstance clusterInstance1 = cluster.getInstanceContexts(instanceId); - clusterMonitor.addInstance(clusterInstance1); - } - - if (clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) { - clusterMonitor.startScheduler(); - log.info("Monitoring task for Cluster Monitor with cluster id " - + clusterInstanceCreatedEvent.getClusterId() + " started successfully"); - } else { - //monitor already started. Invoking it directly to speed up the process - ((ClusterMonitor) clusterMonitor).monitor(); - } - } catch (PolicyValidationException e) { - log.error(e.getMessage(), e); - } catch (PartitionValidationException e) { - log.error(e.getMessage(), e); - } - } - - } else { - log.error("Service " + clusterInstanceCreatedEvent.getServiceName() + - " not found, no cluster instance added to ClusterMonitor " + - clusterInstanceCreatedEvent.getClusterId()); - } - - } finally { - TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(), - clusterInstanceCreatedEvent.getClusterId()); - } - - } else { - log.error("No Cluster Monitor found for cluster id " + - clusterInstanceCreatedEvent.getClusterId()); - } - } - } - - ); + @Override + protected void onEvent(Event event) { + + ClusterInstanceCreatedEvent clusterInstanceCreatedEvent = (ClusterInstanceCreatedEvent) event; + ClusterMonitor clusterMonitor = AutoscalerContext.getInstance(). + getClusterMonitor(clusterInstanceCreatedEvent.getClusterId()); + ClusterInstance clusterInstance = ((ClusterInstanceCreatedEvent) event). + getClusterInstance(); + String instanceId = clusterInstance.getInstanceId(); + //FIXME to take lock when clusterMonitor is running + if (clusterMonitor != null) { + TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(), + clusterInstanceCreatedEvent.getClusterId()); + + try { + Service service = TopologyManager.getTopology(). + getService(clusterInstanceCreatedEvent.getServiceName()); + + if (service != null) { + Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId()); + if (cluster != null) { + try { + ClusterContext clusterContext = (ClusterContext) clusterMonitor.getClusterContext(); + if (clusterContext == null) { + clusterContext = ClusterContextFactory.getVMClusterContext(instanceId, cluster, + clusterMonitor.hasScalingDependents(), + clusterMonitor.getDeploymentPolicyId()); + clusterMonitor.setClusterContext(clusterContext); + + } + log.info(String.format("Cluster monitor has scaling dependents: [%s]", + Boolean.toString(clusterMonitor.hasScalingDependents()))); + // TODO -- remove this log.. + clusterContext.addInstanceContext(instanceId, cluster, + clusterMonitor.hasScalingDependents(), + clusterMonitor.groupScalingEnabledSubtree()); + if (clusterMonitor.getInstance(instanceId) == null) { + // adding the same instance in + // topology to monitor as a reference + ClusterInstance clusterInstance1 = cluster.getInstanceContexts(instanceId); + clusterMonitor.addInstance(clusterInstance1); + } + + if (clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) { + clusterMonitor.startScheduler(); + log.info(String.format( + "Monitoring task for Cluster Monitor with [cluster-id] %s started " + + "successfully", clusterInstanceCreatedEvent.getClusterId())); + } else { + //monitor already started. Invoking it + // directly to speed up the process + ((ClusterMonitor) clusterMonitor).monitor(); + } + } catch (PolicyValidationException e) { + log.error(e.getMessage(), e); + } catch (PartitionValidationException e) { + log.error(e.getMessage(), e); + } + } + + } else { + log.error(String.format("Service %s not found, no cluster instance added to ClusterMonitor", + clusterInstanceCreatedEvent.getServiceName())); + } + + } finally { + TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(), + clusterInstanceCreatedEvent.getClusterId()); + } + + } else { + log.error(String.format("No Cluster Monitor found for [cluster-id] %s", + clusterInstanceCreatedEvent.getClusterId())); + } + } + }); } /** * Terminate load balancer topology receiver thread. */ - public void terminate() { topologyEventReceiver.terminate(); terminated = true;
