Added null check for appMonitor object in 
ClusterInstanceTerminatedEventListener. Fixed formatting and log messages.


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/d0f0f81b
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/d0f0f81b
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/d0f0f81b

Branch: refs/heads/stratos-4.1.x
Commit: d0f0f81bd8a6a142eb4eeda36276d3dac9ff44c5
Parents: d587011
Author: Akila Perera <[email protected]>
Authored: Mon Nov 30 00:12:43 2015 +0530
Committer: Akila Perera <[email protected]>
Committed: Mon Nov 30 00:33:46 2015 +0530

----------------------------------------------------------------------
 .../AutoscalerTopologyEventReceiver.java        | 277 +++++++++----------
 1 file changed, 130 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/d0f0f81b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index 500b95a..8336f86 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -35,8 +35,6 @@ import 
org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
 import org.apache.stratos.autoscaler.monitor.events.ClusterStatusEvent;
 import org.apache.stratos.autoscaler.util.AutoscalerUtil;
 import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
-import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.domain.application.Application;
 import org.apache.stratos.messaging.domain.application.Applications;
 import org.apache.stratos.messaging.domain.instance.ClusterInstance;
@@ -44,12 +42,10 @@ import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.ClusterStatus;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.event.Event;
-import 
org.apache.stratos.messaging.event.initializer.CompleteTopologyRequestEvent;
 import org.apache.stratos.messaging.event.topology.*;
 import org.apache.stratos.messaging.listener.topology.*;
 import 
org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.apache.stratos.messaging.util.MessagingUtil;
 
 import java.util.concurrent.ExecutorService;
 
@@ -57,9 +53,7 @@ import java.util.concurrent.ExecutorService;
  * Autoscaler topology receiver.
  */
 public class AutoscalerTopologyEventReceiver {
-
     private static final Log log = 
LogFactory.getLog(AutoscalerTopologyEventReceiver.class);
-
     private TopologyEventReceiver topologyEventReceiver;
     private boolean terminated;
     private boolean topologyInitialized;
@@ -72,10 +66,8 @@ public class AutoscalerTopologyEventReceiver {
 
     public void execute() {
         //FIXME this activated before autoscaler deployer activated.
-
         topologyEventReceiver.setExecutorService(getExecutorService());
         topologyEventReceiver.execute();
-
         if (log.isInfoEnabled()) {
             log.info("Autoscaler topology receiver thread started");
         }
@@ -87,29 +79,27 @@ public class AutoscalerTopologyEventReceiver {
             @Override
             protected void onEvent(Event event) {
                 if (!topologyInitialized) {
-                    log.info("[CompleteTopologyEvent] Received: " + 
event.getClass());
+                    log.info("[CompleteTopologyEvent] received: " + 
event.getClass());
                     try {
                         ApplicationHolder.acquireReadLock();
                         Applications applications = 
ApplicationHolder.getApplications();
                         if (applications != null) {
                             for (Application application : applications.
                                     getApplications().values()) {
-                                ApplicationContext applicationContext =
-                                        AutoscalerContext.getInstance().
-                                                
getApplicationContext(application.getUniqueIdentifier());
+                                ApplicationContext applicationContext = 
AutoscalerContext.getInstance().
+                                        
getApplicationContext(application.getUniqueIdentifier());
                                 if (applicationContext != null && 
applicationContext.getStatus().
                                         
equals(ApplicationContext.STATUS_DEPLOYED)) {
                                     if 
(AutoscalerUtil.allClustersInitialized(application)) {
-                                        
AutoscalerUtil.getInstance().startApplicationMonitor(
-                                                
application.getUniqueIdentifier());
+                                        AutoscalerUtil.getInstance()
+                                                
.startApplicationMonitor(application.getUniqueIdentifier());
                                     } else {
-                                        log.error("Complete Topology is not 
consistent with " +
-                                                "the applications which got 
persisted");
+                                        log.error("Complete Topology is not 
consistent with the applications which got "
+                                                + "persisted");
                                     }
                                 } else {
-                                    log.info("The application is not yet " +
-                                            "deployed for this [application] " 
+
-                                            application.getUniqueIdentifier());
+                                    log.info("The application is not yet 
deployed for this [application] " + application
+                                            .getUniqueIdentifier());
                                 }
 
                             }
@@ -126,14 +116,13 @@ public class AutoscalerTopologyEventReceiver {
             }
         });
 
-
         topologyEventReceiver.addEventListener(new 
ApplicationClustersCreatedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
-                    log.info("[ApplicationClustersCreatedEvent] Received: " + 
event.getClass());
-                    ApplicationClustersCreatedEvent 
applicationClustersCreatedEvent =
-                            (ApplicationClustersCreatedEvent) event;
+                    log.info("[ApplicationClustersCreatedEvent] received: " + 
event.getClass());
+                    ApplicationClustersCreatedEvent 
applicationClustersCreatedEvent
+                            = (ApplicationClustersCreatedEvent) event;
                     String appId = applicationClustersCreatedEvent.getAppId();
                     try {
                         //acquire read lock
@@ -141,28 +130,26 @@ public class AutoscalerTopologyEventReceiver {
                         //start the application monitor
                         ApplicationContext applicationContext = 
AutoscalerContext.getInstance().
                                 getApplicationContext(appId);
-                        if (applicationContext != null &&
-                                applicationContext.getStatus().
-                                        
equals(ApplicationContext.STATUS_DEPLOYED)) {
+                        if (applicationContext != null && 
applicationContext.getStatus().
+                                equals(ApplicationContext.STATUS_DEPLOYED)) {
                             if (!AutoscalerContext.getInstance().
                                     containsApplicationPendingMonitor(appId)) {
                                 
AutoscalerUtil.getInstance().startApplicationMonitor(appId);
                             }
                         } else {
                             String status;
-                            if(applicationContext == null) {
+                            if (applicationContext == null) {
                                 status = null;
                             } else {
                                 status = applicationContext.getStatus();
                             }
-                            log.error("Error while creating the application 
monitor due to " +
-                                    "in-consistent persistence of 
[application] " +
-                                    applicationClustersCreatedEvent.getAppId() 
+ ", " +
-                                    "the [application-context] " + 
applicationContext +
-                            " status of [application-context] " + status);
+                            log.error(String.format(
+                                    "Error while creating the application 
monitor due to inconsistent persistence of "
+                                            + "[application] %s, 
[application-context] %s, [status] %s",
+                                    
applicationClustersCreatedEvent.getAppId(), applicationContext, status));
                         }
                     } catch (Exception e) {
-                        String msg = "Error processing event " + 
e.getLocalizedMessage();
+                        String msg = "Error processing 
ApplicationClustersCreatedEvent: " + e.getLocalizedMessage();
                         log.error(msg, e);
                     } finally {
                         //release read lock
@@ -179,7 +166,7 @@ public class AutoscalerTopologyEventReceiver {
         topologyEventReceiver.addEventListener(new 
ClusterInstanceActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {
-                log.info("[ClusterActivatedEvent] Received: " + 
event.getClass());
+                log.info("[ClusterActivatedEvent] received: " + 
event.getClass());
                 ClusterInstanceActivatedEvent clusterActivatedEvent = 
(ClusterInstanceActivatedEvent) event;
                 String clusterId = clusterActivatedEvent.getClusterId();
                 String instanceId = clusterActivatedEvent.getInstanceId();
@@ -188,8 +175,8 @@ public class AutoscalerTopologyEventReceiver {
                 monitor = asCtx.getClusterMonitor(clusterId);
                 if (null == monitor) {
                     if (log.isDebugEnabled()) {
-                        log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
-                                + "[cluster] %s", clusterId));
+                        log.debug(String.format("Cluster monitor is not found 
in autoscaler context [cluster-id] %s",
+                                clusterId));
                     }
                     return;
                 }
@@ -202,7 +189,7 @@ public class AutoscalerTopologyEventReceiver {
         topologyEventReceiver.addEventListener(new ClusterResetEventListener() 
{
             @Override
             protected void onEvent(Event event) {
-                log.info("[ClusterCreatedEvent] Received: " + 
event.getClass());
+                log.info("[ClusterCreatedEvent] received: " + 
event.getClass());
                 ClusterResetEvent clusterResetEvent = (ClusterResetEvent) 
event;
                 String clusterId = clusterResetEvent.getClusterId();
                 String instanceId = clusterResetEvent.getInstanceId();
@@ -211,8 +198,8 @@ public class AutoscalerTopologyEventReceiver {
                 monitor = asCtx.getClusterMonitor(clusterId);
                 if (null == monitor) {
                     if (log.isDebugEnabled()) {
-                        log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
-                                + "[cluster] %s", clusterId));
+                        log.debug(String.format("Cluster monitor is not found 
in autoscaler context [cluster-id] %s",
+                                clusterId));
                     }
                     return;
                 }
@@ -226,14 +213,14 @@ public class AutoscalerTopologyEventReceiver {
         topologyEventReceiver.addEventListener(new 
ClusterCreatedEventListener() {
             @Override
             protected void onEvent(Event event) {
-                log.info("[ClusterCreatedEvent] Received: " + 
event.getClass());
+                log.info("[ClusterCreatedEvent] received: " + 
event.getClass());
             }
         });
 
         topologyEventReceiver.addEventListener(new 
ClusterInstanceInactivateEventListener() {
             @Override
             protected void onEvent(Event event) {
-                log.info("[ClusterInactivateEvent] Received: " + 
event.getClass());
+                log.info("[ClusterInactivateEvent] received: " + 
event.getClass());
                 ClusterInstanceInactivateEvent clusterInactivateEvent = 
(ClusterInstanceInactivateEvent) event;
                 String clusterId = clusterInactivateEvent.getClusterId();
                 String instanceId = clusterInactivateEvent.getInstanceId();
@@ -242,8 +229,8 @@ public class AutoscalerTopologyEventReceiver {
                 monitor = asCtx.getClusterMonitor(clusterId);
                 if (null == monitor) {
                     if (log.isDebugEnabled()) {
-                        log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
-                                + "[cluster] %s", clusterId));
+                        log.debug(String.format("Cluster monitor is not found 
in autoscaler context " + "[cluster] %s",
+                                clusterId));
                     }
                     return;
                 }
@@ -255,7 +242,7 @@ public class AutoscalerTopologyEventReceiver {
         topologyEventReceiver.addEventListener(new 
ClusterInstanceTerminatingEventListener() {
             @Override
             protected void onEvent(Event event) {
-                log.info("[ClusterTerminatingEvent] Received: " + 
event.getClass());
+                log.info("[ClusterTerminatingEvent] received: " + 
event.getClass());
                 ClusterInstanceTerminatingEvent clusterTerminatingEvent = 
(ClusterInstanceTerminatingEvent) event;
                 String clusterId = clusterTerminatingEvent.getClusterId();
                 String clusterInstanceId = 
clusterTerminatingEvent.getInstanceId();
@@ -264,8 +251,8 @@ public class AutoscalerTopologyEventReceiver {
                 monitor = asCtx.getClusterMonitor(clusterId);
                 if (null == monitor) {
                     if (log.isDebugEnabled()) {
-                        log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
-                                + "[cluster] %s", clusterId));
+                        log.debug(String.format("Cluster monitor is not found 
in autoscaler context " + "[cluster] %s",
+                                clusterId));
                     }
                     // if monitor does not exist, send cluster terminated event
                     
ClusterStatusEventPublisher.sendClusterTerminatedEvent(clusterTerminatingEvent.getAppId(),
@@ -280,11 +267,9 @@ public class AutoscalerTopologyEventReceiver {
                     InstanceNotificationPublisher.getInstance().
                             sendInstanceCleanupEventForCluster(clusterId, 
clusterInstanceId);
                     //Terminating the pending members
-                    monitor.terminatePendingMembers(clusterInstanceId,
-                            clusterInstance.getNetworkPartitionId());
+                    monitor.terminatePendingMembers(clusterInstanceId, 
clusterInstance.getNetworkPartitionId());
                     //Move all members to terminating pending list
-                    monitor.moveMembersToTerminatingPending(clusterInstanceId,
-                            clusterInstance.getNetworkPartitionId());
+                    monitor.moveMembersToTerminatingPending(clusterInstanceId, 
clusterInstance.getNetworkPartitionId());
                 } else {
                     monitor.notifyParentMonitor(ClusterStatus.Terminating, 
clusterInstanceId);
                     monitor.terminateAllMembers(clusterInstanceId, 
clusterInstance.getNetworkPartitionId());
@@ -297,7 +282,7 @@ public class AutoscalerTopologyEventReceiver {
         topologyEventReceiver.addEventListener(new 
ClusterInstanceTerminatedEventListener() {
             @Override
             protected void onEvent(Event event) {
-                log.info("[ClusterTerminatedEvent] Received: " + 
event.getClass());
+                log.info("[ClusterTerminatedEvent] received: " + 
event.getClass());
                 ClusterInstanceTerminatedEvent clusterTerminatedEvent = 
(ClusterInstanceTerminatedEvent) event;
                 String clusterId = clusterTerminatedEvent.getClusterId();
                 String instanceId = clusterTerminatedEvent.getInstanceId();
@@ -309,16 +294,15 @@ public class AutoscalerTopologyEventReceiver {
                         getAppMonitor(clusterTerminatedEvent.getAppId());
                 if (null == monitor) {
                     if (log.isDebugEnabled()) {
-                        log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
-                                + "[cluster] %s", clusterId));
+                        log.debug(String.format("Cluster monitor is not found 
in autoscaler context [cluster] %s",
+                                clusterId));
                     }
-                    // if the cluster monitor is null, assume that its 
termianted
+                    // if the cluster monitor is null, assume that it is 
terminated
                     appMonitor = AutoscalerContext.getInstance().
                             getAppMonitor(clusterTerminatedEvent.getAppId());
                     if (appMonitor != null && !appMonitor.isForce()) {
                         appMonitor.onChildStatusEvent(
-                                new 
ClusterStatusEvent(ClusterStatus.Terminated,
-                                        clusterId, instanceId));
+                                new 
ClusterStatusEvent(ClusterStatus.Terminated, clusterId, instanceId));
                     }
                     return;
                 }
@@ -330,7 +314,7 @@ public class AutoscalerTopologyEventReceiver {
                         
getNetworkPartitionCtxt(instance.getNetworkPartitionId()).
                         removeInstanceContext(instanceId);
                 monitor.removeInstance(instanceId);
-                if (!monitor.hasInstance() && appMonitor.isTerminating()) {
+                if (!monitor.hasInstance() && (appMonitor != null && 
appMonitor.isTerminating())) {
                     //Destroying and Removing the Cluster monitor
                     monitor.destroy();
                     
AutoscalerContext.getInstance().removeClusterMonitor(clusterId);
@@ -343,7 +327,7 @@ public class AutoscalerTopologyEventReceiver {
             @Override
             protected void onEvent(Event event) {
                 try {
-                    log.info("[MemberReadyToShutdownEvent] Received: " + 
event.getClass());
+                    log.info("[MemberReadyToShutdownEvent] received: " + 
event.getClass());
                     MemberReadyToShutdownEvent memberReadyToShutdownEvent = 
(MemberReadyToShutdownEvent) event;
                     String clusterId = 
memberReadyToShutdownEvent.getClusterId();
                     AutoscalerContext asCtx = AutoscalerContext.getInstance();
@@ -351,20 +335,19 @@ public class AutoscalerTopologyEventReceiver {
                     monitor = asCtx.getClusterMonitor(clusterId);
                     if (null == monitor) {
                         if (log.isDebugEnabled()) {
-                            log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
-                                    + "[cluster] %s", clusterId));
+                            log.debug(String.format(
+                                    "Cluster monitor is not found in 
autoscaler context " + "[cluster] %s", clusterId));
                         }
                         return;
                     }
                     
monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
                 } catch (Exception e) {
-                    String msg = "Error processing event " + 
e.getLocalizedMessage();
+                    String msg = "Error processing MemberReadyToShutdownEvent: 
" + e.getLocalizedMessage();
                     log.error(msg, e);
                 }
             }
         });
 
-
         topologyEventReceiver.addEventListener(new 
MemberStartedEventListener() {
             @Override
             protected void onEvent(Event event) {
@@ -383,14 +366,14 @@ public class AutoscalerTopologyEventReceiver {
                     monitor = asCtx.getClusterMonitor(clusterId);
                     if (null == monitor) {
                         if (log.isDebugEnabled()) {
-                            log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
-                                    + "[cluster] %s", clusterId));
+                            log.debug(String.format(
+                                    "Cluster monitor is not found in 
autoscaler context " + "[cluster] %s", clusterId));
                         }
                         return;
                     }
                     monitor.handleMemberTerminatedEvent(memberTerminatedEvent);
                 } catch (Exception e) {
-                    String msg = "Error processing event " + 
e.getLocalizedMessage();
+                    String msg = "Error processing MemberTerminatedEvent: " + 
e.getLocalizedMessage();
                     log.error(msg, e);
                 }
             }
@@ -408,14 +391,14 @@ public class AutoscalerTopologyEventReceiver {
                     monitor = asCtx.getClusterMonitor(clusterId);
                     if (null == monitor) {
                         if (log.isDebugEnabled()) {
-                            log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
-                                    + "[cluster] %s", clusterId));
+                            log.debug(String.format(
+                                    "Cluster monitor is not found in 
autoscaler context " + "[cluster] %s", clusterId));
                         }
                         return;
                     }
                     monitor.handleMemberActivatedEvent(memberActivatedEvent);
                 } catch (Exception e) {
-                    String msg = "Error processing event " + 
e.getLocalizedMessage();
+                    String msg = "Error processing MemberActivatedEvent: " + 
e.getLocalizedMessage();
                     log.error(msg, e);
                 }
             }
@@ -432,101 +415,101 @@ public class AutoscalerTopologyEventReceiver {
                     monitor = asCtx.getClusterMonitor(clusterId);
                     if (null == monitor) {
                         if (log.isDebugEnabled()) {
-                            log.debug(String.format("A cluster monitor is not 
found in autoscaler context "
-                                    + "[cluster] %s", clusterId));
+                            log.debug(String.format("Cluster monitor is not 
found in autoscaler context [cluster] %s",
+                                    clusterId));
                         }
                         return;
                     }
                     
monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent);
                 } catch (Exception e) {
-                    String msg = "Error processing event " + 
e.getLocalizedMessage();
+                    String msg = "Error processing MemberMaintenanceModeEvent: 
" + e.getLocalizedMessage();
                     log.error(msg, e);
                 }
             }
         });
 
         topologyEventReceiver.addEventListener(new 
ClusterInstanceCreatedEventListener() {
-                                                   @Override
-                                                   protected void 
onEvent(Event event) {
-
-                                                       
ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
-                                                               
(ClusterInstanceCreatedEvent) event;
-                                                       ClusterMonitor 
clusterMonitor = AutoscalerContext.getInstance().
-                                                               
getClusterMonitor(clusterInstanceCreatedEvent.getClusterId());
-                                                       ClusterInstance 
clusterInstance = ((ClusterInstanceCreatedEvent) event).
-                                                               
getClusterInstance();
-                                                       String instanceId = 
clusterInstance.getInstanceId();
-                                                       //FIXME to take lock 
when clusterMonitor is running
-                                                       if (clusterMonitor != 
null) {
-                                                           
TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
-                                                                   
clusterInstanceCreatedEvent.getClusterId());
-
-                                                           try {
-                                                               Service service 
= TopologyManager.getTopology().
-                                                                       
getService(clusterInstanceCreatedEvent.getServiceName());
-
-                                                               if (service != 
null) {
-                                                                   Cluster 
cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId());
-                                                                   if (cluster 
!= null) {
-                                                                       try {
-                                                                           
ClusterContext clusterContext =
-                                                                               
    (ClusterContext) clusterMonitor.getClusterContext();
-                                                                           if 
(clusterContext == null) {
-                                                                               
clusterContext = ClusterContextFactory.getVMClusterContext(instanceId, cluster,
-                                                                               
        clusterMonitor.hasScalingDependents(), 
clusterMonitor.getDeploymentPolicyId());
-                                                                               
clusterMonitor.setClusterContext(clusterContext);
-
-                                                                           }
-                                                                           
log.info(" Cluster monitor has scaling dependents"
-                                                                               
    + "  [" + clusterMonitor.hasScalingDependents() + "] "); // TODO -- remove 
this log..
-                                                                           
clusterContext.addInstanceContext(instanceId, cluster,
-                                                                               
    clusterMonitor.hasScalingDependents(), 
clusterMonitor.groupScalingEnabledSubtree());
-                                                                           if 
(clusterMonitor.getInstance(instanceId) == null) {
-                                                                               
// adding the same instance in topology to monitor as a reference
-                                                                               
ClusterInstance clusterInstance1 = cluster.getInstanceContexts(instanceId);
-                                                                               
clusterMonitor.addInstance(clusterInstance1);
-                                                                           }
-
-                                                                           if 
(clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) {
-                                                                               
clusterMonitor.startScheduler();
-                                                                               
log.info("Monitoring task for Cluster Monitor with cluster id "
-                                                                               
        + clusterInstanceCreatedEvent.getClusterId() + " started successfully");
-                                                                           } 
else {
-                                                                               
//monitor already started. Invoking it directly to speed up the process
-                                                                               
((ClusterMonitor) clusterMonitor).monitor();
-                                                                           }
-                                                                       } catch 
(PolicyValidationException e) {
-                                                                           
log.error(e.getMessage(), e);
-                                                                       } catch 
(PartitionValidationException e) {
-                                                                           
log.error(e.getMessage(), e);
-                                                                       }
-                                                                   }
-
-                                                               } else {
-                                                                   
log.error("Service " + clusterInstanceCreatedEvent.getServiceName() +
-                                                                           " 
not found, no cluster instance added to ClusterMonitor " +
-                                                                           
clusterInstanceCreatedEvent.getClusterId());
-                                                               }
-
-                                                           } finally {
-                                                               
TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
-                                                                       
clusterInstanceCreatedEvent.getClusterId());
-                                                           }
-
-                                                       } else {
-                                                           log.error("No 
Cluster Monitor found for cluster id " +
-                                                                   
clusterInstanceCreatedEvent.getClusterId());
-                                                       }
-                                                   }
-                                               }
-
-        );
+            @Override
+            protected void onEvent(Event event) {
+
+                ClusterInstanceCreatedEvent clusterInstanceCreatedEvent = 
(ClusterInstanceCreatedEvent) event;
+                ClusterMonitor clusterMonitor = 
AutoscalerContext.getInstance().
+                        
getClusterMonitor(clusterInstanceCreatedEvent.getClusterId());
+                ClusterInstance clusterInstance = 
((ClusterInstanceCreatedEvent) event).
+                        getClusterInstance();
+                String instanceId = clusterInstance.getInstanceId();
+                //FIXME to take lock when clusterMonitor is running
+                if (clusterMonitor != null) {
+                    
TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
+                            clusterInstanceCreatedEvent.getClusterId());
+
+                    try {
+                        Service service = TopologyManager.getTopology().
+                                
getService(clusterInstanceCreatedEvent.getServiceName());
+
+                        if (service != null) {
+                            Cluster cluster = 
service.getCluster(clusterInstanceCreatedEvent.getClusterId());
+                            if (cluster != null) {
+                                try {
+                                    ClusterContext clusterContext = 
(ClusterContext) clusterMonitor.getClusterContext();
+                                    if (clusterContext == null) {
+                                        clusterContext = 
ClusterContextFactory.getVMClusterContext(instanceId, cluster,
+                                                
clusterMonitor.hasScalingDependents(),
+                                                
clusterMonitor.getDeploymentPolicyId());
+                                        
clusterMonitor.setClusterContext(clusterContext);
+
+                                    }
+                                    log.info(String.format("Cluster monitor 
has scaling dependents: [%s]",
+                                            
Boolean.toString(clusterMonitor.hasScalingDependents())));
+                                    // TODO -- remove this log..
+                                    
clusterContext.addInstanceContext(instanceId, cluster,
+                                            
clusterMonitor.hasScalingDependents(),
+                                            
clusterMonitor.groupScalingEnabledSubtree());
+                                    if (clusterMonitor.getInstance(instanceId) 
== null) {
+                                        // adding the same instance in
+                                        // topology to monitor as a reference
+                                        ClusterInstance clusterInstance1 = 
cluster.getInstanceContexts(instanceId);
+                                        
clusterMonitor.addInstance(clusterInstance1);
+                                    }
+
+                                    if 
(clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) {
+                                        clusterMonitor.startScheduler();
+                                        log.info(String.format(
+                                                "Monitoring task for Cluster 
Monitor with [cluster-id] %s started "
+                                                        + "successfully", 
clusterInstanceCreatedEvent.getClusterId()));
+                                    } else {
+                                        //monitor already started. Invoking it
+                                        // directly to speed up the process
+                                        ((ClusterMonitor) 
clusterMonitor).monitor();
+                                    }
+                                } catch (PolicyValidationException e) {
+                                    log.error(e.getMessage(), e);
+                                } catch (PartitionValidationException e) {
+                                    log.error(e.getMessage(), e);
+                                }
+                            }
+
+                        } else {
+                            log.error(String.format("Service %s not found, no 
cluster instance added to ClusterMonitor",
+                                    
clusterInstanceCreatedEvent.getServiceName()));
+                        }
+
+                    } finally {
+                        
TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
+                                clusterInstanceCreatedEvent.getClusterId());
+                    }
+
+                } else {
+                    log.error(String.format("No Cluster Monitor found for 
[cluster-id] %s",
+                            clusterInstanceCreatedEvent.getClusterId()));
+                }
+            }
+        });
     }
 
     /**
      * Terminate load balancer topology receiver thread.
      */
-
     public void terminate() {
         topologyEventReceiver.terminate();
         terminated = true;

Reply via email to