Remove unnessary threads in messaging model

Conflicts:
        
components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
        
components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
        
components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
        
extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/8012f8c8
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/8012f8c8
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/8012f8c8

Branch: refs/heads/master
Commit: 8012f8c8d1b7ef808fae55d0f18826e651c04ed9
Parents: 9e6e91d
Author: gayan <[email protected]>
Authored: Mon Dec 1 16:25:22 2014 +0530
Committer: gayan <[email protected]>
Committed: Tue Dec 2 16:36:37 2014 +0530

----------------------------------------------------------------------
 .../AutoscalerTopologyEventReceiver.java        | 527 ++++++++++++++++++-
 .../internal/AutoscalerServerComponent.java     | 131 ++++-
 .../CloudControllerServiceComponent.java        | 142 ++++-
 .../application/ApplicationTopicReceiver.java   |  65 ++-
 .../status/ClusterStatusTopicReceiver.java      | 110 ++--
 .../status/InstanceStatusTopicReceiver.java     | 128 +++--
 .../extension/api/LoadBalancerExtension.java    | 256 +++++----
 .../extension/FaultHandlingWindowProcessor.java | 307 +++++++++--
 .../apache/stratos/haproxy/extension/Main.java  |  48 +-
 9 files changed, 1332 insertions(+), 382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index bfdf30b..1f14542 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -54,15 +54,16 @@ import java.util.concurrent.ExecutorService;
 /**
  * Autoscaler topology receiver.
  */
-public class AutoscalerTopologyEventReceiver{
+public class AutoscalerTopologyEventReceiver {
 
-    private static final Log log = 
LogFactory.getLog(AutoscalerTopologyEventReceiver.class);
+       private static final Log log = 
LogFactory.getLog(AutoscalerTopologyEventReceiver.class);
 
-    private TopologyEventReceiver topologyEventReceiver;
-    private boolean terminated;
-    private boolean topologyInitialized;
+       private TopologyEventReceiver topologyEventReceiver;
+       private boolean terminated;
+       private boolean topologyInitialized;
        private ExecutorService executorService;
 
+<<<<<<< HEAD
     public AutoscalerTopologyEventReceiver() {
         this.topologyEventReceiver = new TopologyEventReceiver();
         addEventListeners();
@@ -523,6 +524,461 @@ public class AutoscalerTopologyEventReceiver{
         topologyEventReceiver.terminate();
         terminated = true;
     }
+=======
+       public AutoscalerTopologyEventReceiver() {
+               this.topologyEventReceiver = new TopologyEventReceiver();
+               addEventListeners();
+       }
+
+       public void execute() {
+               //FIXME this activated before autoscaler deployer activated.
+
+               topologyEventReceiver.setExecutorService(executorService);
+               topologyEventReceiver.execute();
+
+               if (log.isInfoEnabled()) {
+                       log.info("Autoscaler topology receiver thread started");
+               }
+
+       }
+
+       private boolean allClustersInitialized(Application application) {
+               boolean allClustersInitialized = false;
+               for (ClusterDataHolder holder : 
application.getClusterDataRecursively()) {
+                       
TopologyManager.acquireReadLockForCluster(holder.getServiceType(),
+                                                                 
holder.getClusterId());
+
+                       try {
+                               Topology topology = 
TopologyManager.getTopology();
+                               if (topology != null) {
+                                       Service service = 
topology.getService(holder.getServiceType());
+                                       if (service != null) {
+                                               if 
(service.clusterExists(holder.getClusterId())) {
+                                                       allClustersInitialized 
= true;
+                                                       return 
allClustersInitialized;
+                                               } else {
+                                                       if 
(log.isDebugEnabled()) {
+                                                               
log.debug("[Cluster] " + holder.getClusterId() + " is not found in " +
+                                                                         "the 
Topology");
+                                                       }
+                                                       allClustersInitialized 
= false;
+                                               }
+                                       } else {
+                                               if (log.isDebugEnabled()) {
+                                                       log.debug("Service is 
null in the CompleteTopologyEvent");
+                                               }
+                                       }
+                               } else {
+                                       if (log.isDebugEnabled()) {
+                                               log.debug("Topology is null in 
the CompleteTopologyEvent");
+                                       }
+                               }
+                       } finally {
+                               
TopologyManager.releaseReadLockForCluster(holder.getServiceType(),
+                                                                         
holder.getClusterId());
+                       }
+               }
+               return allClustersInitialized;
+       }
+
+       private void addEventListeners() {
+               // Listen to topology events that affect clusters
+               topologyEventReceiver.addEventListener(new 
CompleteTopologyEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               if (!topologyInitialized) {
+                                       log.info("[CompleteTopologyEvent] 
Received: " + event.getClass());
+                                       ApplicationHolder.acquireReadLock();
+                                       try {
+                                               Applications applications = 
ApplicationHolder.getApplications();
+                                               if (applications != null) {
+                                                       for (Application 
application : applications.getApplications().values()) {
+                                                               if 
(allClustersInitialized(application)) {
+                                                                       
startApplicationMonitor(application.getUniqueIdentifier());
+                                                               } else {
+                                                                       
log.error("Complete Topology is not consistent with the applications " +
+                                                                               
  "which got persisted");
+                                                               }
+                                                       }
+                                                       topologyInitialized = 
true;
+                                               } else {
+                                                       log.info("No 
applications found in the complete topology");
+                                               }
+                                       } catch (Exception e) {
+                                               log.error("Error processing 
event", e);
+                                       } finally {
+                                               
ApplicationHolder.releaseReadLock();
+                                       }
+                               }
+                       }
+               });
+
+               topologyEventReceiver.addEventListener(new 
ApplicationClustersCreatedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               try {
+                                       
log.info("[ApplicationClustersCreatedEvent] Received: " + event.getClass());
+                                       ApplicationClustersCreatedEvent 
applicationClustersCreatedEvent =
+                                                       
(ApplicationClustersCreatedEvent) event;
+                                       String appId = 
applicationClustersCreatedEvent.getAppId();
+                                       try {
+                                               //acquire read lock
+                                               
ApplicationHolder.acquireReadLock();
+                                               //start the application monitor
+                                               startApplicationMonitor(appId);
+                                       } catch (Exception e) {
+                                               String msg = "Error processing 
event " + e.getLocalizedMessage();
+                                               log.error(msg, e);
+                                       } finally {
+                                               //release read lock
+                                               
ApplicationHolder.releaseReadLock();
+
+                                       }
+                               } catch (ClassCastException e) {
+                                       String msg = "Error while casting the 
event " + e.getLocalizedMessage();
+                                       log.error(msg, e);
+                               }
+
+                       }
+               });
+
+               topologyEventReceiver.addEventListener(new 
ClusterActivatedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               log.info("[ClusterActivatedEvent] Received: " + 
event.getClass());
+                               ClusterActivatedEvent clusterActivatedEvent = 
(ClusterActivatedEvent) event;
+                               String clusterId = 
clusterActivatedEvent.getClusterId();
+                               AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
+                               AbstractClusterMonitor monitor;
+                               monitor = asCtx.getClusterMonitor(clusterId);
+                               if (null == monitor) {
+                                       if (log.isDebugEnabled()) {
+                                               log.debug(String.format("A 
cluster monitor is not found in autoscaler context "
+                                                                       + 
"[cluster] %s", clusterId));
+                                       }
+                                       return;
+                               }
+                               //changing the status in the monitor, will 
notify its parent monitor
+
+                       }
+               });
+
+               topologyEventReceiver.addEventListener(new 
ClusterResetEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               log.info("[ClusterCreatedEvent] Received: " + 
event.getClass());
+                               ClusterResetEvent clusterResetEvent = 
(ClusterResetEvent) event;
+                               String clusterId = 
clusterResetEvent.getClusterId();
+                               AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
+                               AbstractClusterMonitor monitor;
+                               monitor = asCtx.getClusterMonitor(clusterId);
+                               if (null == monitor) {
+                                       if (log.isDebugEnabled()) {
+                                               log.debug(String.format("A 
cluster monitor is not found in autoscaler context "
+                                                                       + 
"[cluster] %s", clusterId));
+                                       }
+                                       return;
+                               }
+                               //changing the status in the monitor, will 
notify its parent monitor
+                               monitor.destroy();
+                               monitor.setStatus(ClusterStatus.Created);
+
+                       }
+               });
+
+               topologyEventReceiver.addEventListener(new 
ClusterCreatedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               log.info("[ClusterCreatedEvent] Received: " + 
event.getClass());
+                       }
+               });
+
+               topologyEventReceiver.addEventListener(new 
ClusterInActivateEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               log.info("[ClusterInActivateEvent] Received: " 
+ event.getClass());
+                               ClusterInactivateEvent clusterInactivateEvent = 
(ClusterInactivateEvent) event;
+                               String clusterId = 
clusterInactivateEvent.getClusterId();
+                               AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
+                               AbstractClusterMonitor monitor;
+                               monitor = asCtx.getClusterMonitor(clusterId);
+                               if (null == monitor) {
+                                       if (log.isDebugEnabled()) {
+                                               log.debug(String.format("A 
cluster monitor is not found in autoscaler context "
+                                                                       + 
"[cluster] %s", clusterId));
+                                       }
+                                       return;
+                               }
+                               //changing the status in the monitor, will 
notify its parent monitor
+                               monitor.setStatus(ClusterStatus.Inactive);
+                       }
+               });
+
+               topologyEventReceiver.addEventListener(new 
ClusterTerminatingEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               log.info("[ClusterTerminatingEvent] Received: " 
+ event.getClass());
+                               ClusterTerminatingEvent clusterTerminatingEvent 
= (ClusterTerminatingEvent) event;
+                               String clusterId = 
clusterTerminatingEvent.getClusterId();
+                               String instanceId = 
clusterTerminatingEvent.getInstanceId();
+                               AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
+                               AbstractClusterMonitor monitor;
+                               monitor = asCtx.getClusterMonitor(clusterId);
+                               if (null == monitor) {
+                                       if (log.isDebugEnabled()) {
+                                               log.debug(String.format("A 
cluster monitor is not found in autoscaler context "
+                                                                       + 
"[cluster] %s", clusterId));
+                                       }
+                                       // if monitor does not exist, send 
cluster terminated event
+                                       
ClusterStatusEventPublisher.sendClusterTerminatedEvent(clusterTerminatingEvent.getAppId(),
+                                                                               
               clusterTerminatingEvent.getServiceName(),
+                                                                               
               clusterId, instanceId);
+                                       return;
+                               }
+                               //changing the status in the monitor, will 
notify its parent monitor
+                               if (monitor.getStatus() == 
ClusterStatus.Active) {
+                                       // terminated gracefully
+                                       
monitor.setStatus(ClusterStatus.Terminating);
+                                       
InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId);
+                               } else {
+                                       
monitor.setStatus(ClusterStatus.Terminating);
+                                       monitor.terminateAllMembers();
+                               }
+                               
ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().
+                                               process("", clusterId, 
instanceId);
+                       }
+               });
+
+               topologyEventReceiver.addEventListener(new 
ClusterTerminatedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               log.info("[ClusterTerminatedEvent] Received: " 
+ event.getClass());
+                               ClusterTerminatedEvent clusterTerminatedEvent = 
(ClusterTerminatedEvent) event;
+                               String clusterId = 
clusterTerminatedEvent.getClusterId();
+                               AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
+                               AbstractClusterMonitor monitor;
+                               monitor = asCtx.getClusterMonitor(clusterId);
+                               if (null == monitor) {
+                                       if (log.isDebugEnabled()) {
+                                               log.debug(String.format("A 
cluster monitor is not found in autoscaler context "
+                                                                       + 
"[cluster] %s", clusterId));
+                                       }
+                                       // if the cluster monitor is null, 
assume that its termianted
+                                       ApplicationMonitor appMonitor =
+                                                       
AutoscalerContext.getInstance().getAppMonitor(clusterTerminatedEvent.getAppId());
+                                       if (appMonitor != null) {
+                                               appMonitor
+                                                               
.onChildStatusEvent(new ClusterStatusEvent(ClusterStatus.Terminated, clusterId, 
null));
+                                       }
+                                       return;
+                               }
+                               //changing the status in the monitor, will 
notify its parent monitor
+                               monitor.setStatus(ClusterStatus.Terminated);
+                               //Destroying and Removing the Cluster monitor
+                               monitor.destroy();
+                               
AutoscalerContext.getInstance().removeClusterMonitor(clusterId);
+                       }
+               });
+
+               topologyEventReceiver.addEventListener(new 
MemberReadyToShutdownEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               try {
+                                       MemberReadyToShutdownEvent 
memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event;
+                                       String clusterId = 
memberReadyToShutdownEvent.getClusterId();
+                                       AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
+                                       AbstractClusterMonitor monitor;
+                                       monitor = 
asCtx.getClusterMonitor(clusterId);
+                                       if (null == monitor) {
+                                               if (log.isDebugEnabled()) {
+                                                       
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+                                                                               
+ "[cluster] %s", clusterId));
+                                               }
+                                               return;
+                                       }
+                                       
monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
+                               } catch (Exception e) {
+                                       String msg = "Error processing event " 
+ e.getLocalizedMessage();
+                                       log.error(msg, e);
+                               }
+                       }
+               });
+
+               topologyEventReceiver.addEventListener(new 
MemberStartedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+
+                       }
+               });
+
+               topologyEventReceiver.addEventListener(new 
MemberTerminatedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               try {
+                                       MemberTerminatedEvent 
memberTerminatedEvent = (MemberTerminatedEvent) event;
+                                       String clusterId = 
memberTerminatedEvent.getClusterId();
+                                       AbstractClusterMonitor monitor;
+                                       AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
+                                       monitor = 
asCtx.getClusterMonitor(clusterId);
+                                       if (null == monitor) {
+                                               if (log.isDebugEnabled()) {
+                                                       
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+                                                                               
+ "[cluster] %s", clusterId));
+                                               }
+                                               return;
+                                       }
+                                       
monitor.handleMemberTerminatedEvent(memberTerminatedEvent);
+                               } catch (Exception e) {
+                                       String msg = "Error processing event " 
+ e.getLocalizedMessage();
+                                       log.error(msg, e);
+                               }
+                       }
+               });
+
+               topologyEventReceiver.addEventListener(new 
MemberActivatedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               try {
+                                       MemberActivatedEvent 
memberActivatedEvent = (MemberActivatedEvent) event;
+                                       String clusterId = 
memberActivatedEvent.getClusterId();
+                                       AbstractClusterMonitor monitor;
+                                       AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
+                                       monitor = 
asCtx.getClusterMonitor(clusterId);
+                                       if (null == monitor) {
+                                               if (log.isDebugEnabled()) {
+                                                       
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+                                                                               
+ "[cluster] %s", clusterId));
+                                               }
+                                               return;
+                                       }
+                                       
monitor.handleMemberActivatedEvent(memberActivatedEvent);
+                               } catch (Exception e) {
+                                       String msg = "Error processing event " 
+ e.getLocalizedMessage();
+                                       log.error(msg, e);
+                               }
+                       }
+               });
+
+               topologyEventReceiver.addEventListener(new 
MemberMaintenanceListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               try {
+                                       MemberMaintenanceModeEvent 
maintenanceModeEvent = (MemberMaintenanceModeEvent) event;
+                                       String clusterId = 
maintenanceModeEvent.getClusterId();
+                                       AbstractClusterMonitor monitor;
+                                       AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
+                                       monitor = 
asCtx.getClusterMonitor(clusterId);
+                                       if (null == monitor) {
+                                               if (log.isDebugEnabled()) {
+                                                       
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+                                                                               
+ "[cluster] %s", clusterId));
+                                               }
+                                               return;
+                                       }
+                                       
monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent);
+                               } catch (Exception e) {
+                                       String msg = "Error processing event " 
+ e.getLocalizedMessage();
+                                       log.error(msg, e);
+                               }
+                       }
+               });
+
+               topologyEventReceiver.addEventListener(new 
ClusterInstanceCreatedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+
+                               ClusterInstanceCreatedEvent 
clusterInstanceCreatedEvent =
+                                               (ClusterInstanceCreatedEvent) 
event;
+                               AbstractClusterMonitor clusterMonitor = 
AutoscalerContext.getInstance().
+                                               
getClusterMonitor(clusterInstanceCreatedEvent.getClusterId());
+
+                               if (clusterMonitor != null) {
+                                       
TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
+                                                                               
  clusterInstanceCreatedEvent.getClusterId());
+
+                                       try {
+                                               Service service = 
TopologyManager.getTopology().
+                                                               
getService(clusterInstanceCreatedEvent.getServiceName());
+
+                                               if (service != null) {
+                                                       Cluster cluster = 
service.getCluster(clusterInstanceCreatedEvent.getClusterId());
+                                                       if (cluster != null) {
+                                                               // create and 
add Cluster Context
+                                                               try {
+                                                                       if 
(cluster.isKubernetesCluster()) {
+                                                                               
clusterMonitor.addClusterContextForInstance(
+                                                                               
                clusterInstanceCreatedEvent.getInstanceId(),
+                                                                               
                ClusterContextFactory.getKubernetesClusterContext(cluster));
+                                                                       } else 
if (cluster.isLbCluster()) {
+                                                                               
clusterMonitor.addClusterContextForInstance(
+                                                                               
                clusterInstanceCreatedEvent.getInstanceId(),
+                                                                               
                ClusterContextFactory.getVMLBClusterContext(cluster));
+                                                                       } else {
+                                                                               
clusterMonitor.addClusterContextForInstance(
+                                                                               
                clusterInstanceCreatedEvent.getInstanceId(),
+                                                                               
                ClusterContextFactory.getVMServiceClusterContext(cluster));
+                                                                       }
+
+                                                                       if 
(clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) {
+                                                                               
clusterMonitor.startScheduler();
+                                                                               
log.info("Monitoring task for Cluster Monitor with cluster id " +
+                                                                               
         clusterInstanceCreatedEvent.getClusterId() + " started successfully");
+                                                                       }
+
+                                                               } catch 
(PolicyValidationException e) {
+                                                                       
log.error(e.getMessage(), e);
+                                                               } catch 
(PartitionValidationException e) {
+                                                                       
log.error(e.getMessage(), e);
+                                                               }
+
+                                                       } else {
+                                                               
log.error("Cluster not found for " + clusterInstanceCreatedEvent.getClusterId() 
+
+                                                                         ", no 
cluster instance added to ClusterMonitor " +
+                                                                         
clusterInstanceCreatedEvent.getClusterId());
+                                                       }
+                                               } else {
+                                                       log.error("Service " + 
clusterInstanceCreatedEvent.getServiceName() +
+                                                                 " not found, 
no cluster instance added to ClusterMonitor " +
+                                                                 
clusterInstanceCreatedEvent.getClusterId());
+                                               }
+
+                                       } finally {
+                                               
TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
+                                                                               
          clusterInstanceCreatedEvent.getClusterId());
+                                       }
+
+                               } else {
+                                       log.error("No Cluster Monitor found for 
cluster id " +
+                                                 
clusterInstanceCreatedEvent.getClusterId());
+                               }
+                       }
+               });
+       }
+
+       /**
+        * Terminate load balancer topology receiver thread.
+        */
+       public void terminate() {
+               topologyEventReceiver.terminate();
+               terminated = true;
+       }
+
+       protected synchronized void startApplicationMonitor(String 
applicationId) {
+               Thread th = null;
+               if 
(AutoscalerContext.getInstance().getAppMonitor(applicationId) == null) {
+                       th = new Thread(new 
ApplicationMonitorAdder(applicationId));
+               }
+               if (th != null) {
+                       th.start();
+               } else {
+                       if (log.isDebugEnabled()) {
+                               log.debug(String
+                                                         .format("Application 
monitor thread already exists: " +
+                                                                 
"[application] %s ", applicationId));
+                       }
+               }
+       }
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
 
        public ExecutorService getExecutorService() {
                return executorService;
@@ -531,4 +987,65 @@ public class AutoscalerTopologyEventReceiver{
        public void setExecutorService(ExecutorService executorService) {
                this.executorService = executorService;
        }
+<<<<<<< HEAD
+=======
+
+       private class ApplicationMonitorAdder implements Runnable {
+               private String appId;
+
+               public ApplicationMonitorAdder(String appId) {
+                       this.appId = appId;
+               }
+
+               public void run() {
+                       ApplicationMonitor applicationMonitor = null;
+                       int retries = 5;
+                       boolean success = false;
+                       do {
+                               try {
+                                       Thread.sleep(5000);
+                               } catch (InterruptedException e1) {
+                               }
+                               try {
+                                       long start = System.currentTimeMillis();
+                                       if (log.isDebugEnabled()) {
+                                               log.debug("application monitor 
is going to be started for [application] " +
+                                                         appId);
+                                       }
+                                       try {
+                                               applicationMonitor = 
MonitorFactory.getApplicationMonitor(appId);
+                                       } catch (PolicyValidationException e) {
+                                               String msg = "Application 
monitor creation failed for Application: ";
+                                               log.warn(msg, e);
+                                               retries--;
+                                       }
+                                       long end = System.currentTimeMillis();
+                                       log.info("Time taken to start app 
monitor: " + (end - start) / 1000);
+                                       success = true;
+                               } catch (DependencyBuilderException e) {
+                                       String msg = "Application monitor 
creation failed for Application: ";
+                                       log.warn(msg, e);
+                                       retries--;
+                               } catch (TopologyInConsistentException e) {
+                                       String msg = "Application monitor 
creation failed for Application: ";
+                                       log.warn(msg, e);
+                                       retries--;
+                               }
+                       } while (!success && retries != 0);
+
+                       if (applicationMonitor == null) {
+                               String msg = "Application monitor creation 
failed, even after retrying for 5 times, "
+                                            + "for Application: " + appId;
+                               log.error(msg);
+                               throw new RuntimeException(msg);
+                       }
+
+                       
AutoscalerContext.getInstance().addAppMonitor(applicationMonitor);
+                       if (log.isInfoEnabled()) {
+                               log.info(String.format("Application monitor has 
been added successfully: " +
+                                                      "[application] %s", 
applicationMonitor.getId()));
+                       }
+               }
+       }
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index 3694066..2e443de 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -65,16 +65,16 @@ public class AutoscalerServerComponent {
 
        private static final String THREAD_IDENTIFIER_KEY = 
"threadPool.autoscaler.identifier";
        private static final String DEFAULT_IDENTIFIER = "Auto-Scaler";
-       private static final String THREAD_POOL_SIZE_KEY= 
"threadPool.autoscaler.threadPoolSize";
+       private static final String THREAD_POOL_SIZE_KEY = 
"threadPool.autoscaler.threadPoolSize";
        private static final String COMPONENTS_CONFIG = "components-config";
        private static final int THREAD_POOL_SIZE = 10;
        private static final Log log = 
LogFactory.getLog(AutoscalerServerComponent.class);
 
-
        private AutoscalerTopologyEventReceiver asTopologyReceiver;
-    private AutoscalerHealthStatEventReceiver 
autoscalerHealthStatEventReceiver;
+       private AutoscalerHealthStatEventReceiver 
autoscalerHealthStatEventReceiver;
 
        protected void activate(ComponentContext componentContext) throws 
Exception {
+<<<<<<< HEAD
         try {
             // Start topology receiver
                XMLConfiguration conf = 
ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
@@ -197,3 +197,128 @@ public class AutoscalerServerComponent {
         ServiceReferenceHolder.getInstance().setTaskService(null);
     }
 }
+=======
+               try {
+                       // Start topology receiver
+                       XMLConfiguration conf = 
ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
+                       int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY, 
THREAD_POOL_SIZE);
+                       String threadIdentifier = 
conf.getString(THREAD_IDENTIFIER_KEY, DEFAULT_IDENTIFIER);
+                       ExecutorService executorService = 
StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize);
+                       asTopologyReceiver = new 
AutoscalerTopologyEventReceiver();
+                       asTopologyReceiver.setExecutorService(executorService);
+                       asTopologyReceiver.execute();
+
+                       if (log.isDebugEnabled()) {
+                               log.debug("Topology receiver executor service 
started");
+                       }
+
+                       // Start health stat receiver
+                       autoscalerHealthStatEventReceiver = new 
AutoscalerHealthStatEventReceiver();
+                       Thread healthDelegatorThread = new 
Thread(autoscalerHealthStatEventReceiver);
+                       healthDelegatorThread.start();
+                       if (log.isDebugEnabled()) {
+                               log.debug("Health statistics receiver thread 
started");
+                       }
+
+                       // Adding the registry stored partitions to the 
information model
+                       List<Partition> partitions = 
RegistryManager.getInstance().retrievePartitions();
+                       Iterator<Partition> partitionIterator = 
partitions.iterator();
+                       while (partitionIterator.hasNext()) {
+                               Partition partition = partitionIterator.next();
+                               
PartitionManager.getInstance().addPartitionToInformationModel(partition);
+                       }
+
+                       // Adding the network partitions stored in registry to 
the information model
+                       List<NetworkPartitionLbHolder> nwPartitionHolders =
+                                       
RegistryManager.getInstance().retrieveNetworkPartitionLbHolders();
+                       Iterator<NetworkPartitionLbHolder> nwPartitionIterator 
= nwPartitionHolders.iterator();
+                       while (nwPartitionIterator.hasNext()) {
+                               NetworkPartitionLbHolder nwPartition = 
nwPartitionIterator.next();
+                               
PartitionManager.getInstance().addNetworkPartitionLbHolder(nwPartition);
+                       }
+
+                       List<AutoscalePolicy> asPolicies = 
RegistryManager.getInstance().retrieveASPolicies();
+                       Iterator<AutoscalePolicy> asPolicyIterator = 
asPolicies.iterator();
+                       while (asPolicyIterator.hasNext()) {
+                               AutoscalePolicy asPolicy = 
asPolicyIterator.next();
+                               
PolicyManager.getInstance().addASPolicyToInformationModel(asPolicy);
+                       }
+
+                       List<DeploymentPolicy> depPolicies = 
RegistryManager.getInstance().retrieveDeploymentPolicies();
+                       Iterator<DeploymentPolicy> depPolicyIterator = 
depPolicies.iterator();
+                       while (depPolicyIterator.hasNext()) {
+                               DeploymentPolicy depPolicy = 
depPolicyIterator.next();
+                               
PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy);
+                       }
+
+                       // Adding KubernetesGroups stored in registry to the 
information model
+                       List<KubernetesGroup> kubernetesGroupList = 
RegistryManager.getInstance().retrieveKubernetesGroups();
+                       Iterator<KubernetesGroup> kubernetesGroupIterator = 
kubernetesGroupList.iterator();
+                       while (kubernetesGroupIterator.hasNext()) {
+                               KubernetesGroup kubernetesGroup = 
kubernetesGroupIterator.next();
+                               
KubernetesManager.getInstance().addNewKubernetesGroup(kubernetesGroup);
+                       }
+
+                       //starting the processor chain
+                       ClusterStatusProcessorChain clusterStatusProcessorChain 
= new ClusterStatusProcessorChain();
+                       
ServiceReferenceHolder.getInstance().setClusterStatusProcessorChain(clusterStatusProcessorChain);
+
+                       GroupStatusProcessorChain groupStatusProcessorChain = 
new GroupStatusProcessorChain();
+                       
ServiceReferenceHolder.getInstance().setGroupStatusProcessorChain(groupStatusProcessorChain);
+
+                       if (log.isInfoEnabled()) {
+                               log.info("Scheduling tasks to publish 
applications");
+                       }
+
+                       ApplicationSynchronizerTaskScheduler
+                                       
.schedule(ServiceReferenceHolder.getInstance()
+                                                                       
.getTaskService());
+
+                       if (log.isInfoEnabled()) {
+                               log.info("Autoscaler server Component 
activated");
+                       }
+               } catch (Throwable e) {
+                       log.error("Error in activating the autoscaler component 
", e);
+               }
+       }
+
+       protected void deactivate(ComponentContext context) {
+               asTopologyReceiver.terminate();
+               autoscalerHealthStatEventReceiver.terminate();
+       }
+
+       protected void setRegistryService(RegistryService registryService) {
+               if (log.isDebugEnabled()) {
+                       log.debug("Setting the Registry Service");
+               }
+               try {
+                       
ServiceReferenceHolder.getInstance().setRegistry(registryService.getGovernanceSystemRegistry());
+               } catch (RegistryException e) {
+                       String msg = "Failed when retrieving Governance System 
Registry.";
+                       log.error(msg, e);
+                       throw new AutoScalerException(msg, e);
+               }
+       }
+
+       protected void unsetRegistryService(RegistryService registryService) {
+               if (log.isDebugEnabled()) {
+                       log.debug("Un-setting the Registry Service");
+               }
+               ServiceReferenceHolder.getInstance().setRegistry(null);
+       }
+
+       protected void setTaskService(TaskService taskService) {
+               if (log.isDebugEnabled()) {
+                       log.debug("Setting the Task Service");
+               }
+               
ServiceReferenceHolder.getInstance().setTaskService(taskService);
+       }
+
+       protected void unsetTaskService(TaskService taskService) {
+               if (log.isDebugEnabled()) {
+                       log.debug("Un-setting the Task Service");
+               }
+               ServiceReferenceHolder.getInstance().setTaskService(null);
+       }
+}
+>>>>>>> ddf277b... Remove unnessary threads in messaging model

http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index a47e924..dfbf1ec 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -20,9 +20,12 @@ package org.apache.stratos.cloud.controller.internal;
  *
 */
 
+<<<<<<< HEAD
 
 import com.hazelcast.core.HazelcastInstance;
 
+=======
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.context.CloudControllerContext;
@@ -49,6 +52,7 @@ import org.wso2.carbon.utils.ConfigurationContextService;
  * Registering Cloud Controller Service.
  *
  * @scr.component name="org.apache.stratos.cloud.controller" immediate="true"
+<<<<<<< HEAD
  * @scr.reference name="distributedObjectProvider" 
interface="org.apache.stratos.common.clustering.DistributedObjectProvider"
  *                cardinality="1..1" policy="dynamic" 
bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider"
  * @scr.reference name="ntask.component" 
interface="org.wso2.carbon.ntask.core.service.TaskService"
@@ -57,42 +61,72 @@ import org.wso2.carbon.utils.ConfigurationContextService;
  *                cardinality="1..1" policy="dynamic" 
bind="setRegistryService" unbind="unsetRegistryService"
  * @scr.reference name="config.context.service" 
interface="org.wso2.carbon.utils.ConfigurationContextService"
  *                cardinality="1..1" policy="dynamic" 
bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
+=======
+ * @scr.reference name="distributedMapProvider" 
interface="org.wso2.carbon.caching.impl.DistributedMapProvider"
+ * cardinality="1..1" policy="dynamic" bind="setDistributedMapProvider" 
unbind="unsetDistributedMapProvider"
+ * @scr.reference name="ntask.component"
+ * interface="org.wso2.carbon.ntask.core.service.TaskService"
+ * cardinality="1..1" policy="dynamic" bind="setTaskService" 
unbind="unsetTaskService"
+ * @scr.reference name="registry.service"
+ * interface="org.wso2.carbon.registry.core.service.RegistryService"
+ * cardinality="1..1" policy="dynamic" bind="setRegistryService" 
unbind="unsetRegistryService"
+ * @scr.reference name="config.context.service"
+ * interface="org.wso2.carbon.utils.ConfigurationContextService"
+ * cardinality="1..1" policy="dynamic" bind="setConfigurationContextService" 
unbind="unsetConfigurationContextService"
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
  */
 public class CloudControllerServiceComponent {
 
-    private static final Log log = 
LogFactory.getLog(CloudControllerServiceComponent.class);
-    private ClusterStatusTopicReceiver clusterStatusTopicReceiver;
-    private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
-    private ApplicationTopicReceiver applicationTopicReceiver;
+       private static final Log log = 
LogFactory.getLog(CloudControllerServiceComponent.class);
+       private ClusterStatusTopicReceiver clusterStatusTopicReceiver;
+       private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
+       private ApplicationTopicReceiver applicationTopicReceiver;
 
-    protected void activate(ComponentContext context) {
-        try {
-            applicationTopicReceiver = new ApplicationTopicReceiver();
-            applicationTopicReceiver.execute();
+       protected void activate(ComponentContext context) {
+               try {
+                       applicationTopicReceiver = new 
ApplicationTopicReceiver();
+                       applicationTopicReceiver.execute();
 
+                       if (log.isInfoEnabled()) {
+                               log.info("Application Receiver thread started");
+                       }
+
+<<<<<<< HEAD
             if (log.isInfoEnabled()) {
                 log.info("Application event receiver thread started");
             }
+=======
+                       clusterStatusTopicReceiver = new 
ClusterStatusTopicReceiver();
+                       clusterStatusTopicReceiver.execute();
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
 
-            clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
-               clusterStatusTopicReceiver.execute();
+                       if (log.isInfoEnabled()) {
+                               log.info("Cluster status Receiver thread 
started");
+                       }
 
+<<<<<<< HEAD
             if (log.isInfoEnabled()) {
                 log.info("Cluster status receiver thread started");
             }
+=======
+                       instanceStatusTopicReceiver = new 
InstanceStatusTopicReceiver();
+                       instanceStatusTopicReceiver.execute();
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
 
-            instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
-            instanceStatusTopicReceiver.execute();
+                       if (log.isInfoEnabled()) {
+                               log.info("Instance status message receiver 
thread started");
+                       }
 
-            if(log.isInfoEnabled()) {
-                log.info("Instance status message receiver thread started");
-            }
+                       // Register cloud controller service
+                       BundleContext bundleContext = 
context.getBundleContext();
+                       
bundleContext.registerService(CloudControllerService.class.getName(),
+                                                     new 
CloudControllerServiceImpl(), null);
 
-               // Register cloud controller service
-            BundleContext bundleContext = context.getBundleContext();
-            
bundleContext.registerService(CloudControllerService.class.getName(),
-                    new CloudControllerServiceImpl(), null);
+                       if (log.isInfoEnabled()) {
+                               log.info("Scheduling tasks");
+                       }
 
+<<<<<<< HEAD
             if(log.isInfoEnabled()) {
                 log.info("Scheduling tasks");
             }
@@ -123,29 +157,71 @@ public class CloudControllerServiceComponent {
         ServiceReferenceHolder.getInstance().setTaskService(null);
     }
     
+=======
+                       TopologySynchronizerTaskScheduler
+                                       
.schedule(ServiceReferenceHolder.getInstance()
+                                                                       
.getTaskService());
+
+               } catch (Throwable e) {
+                       log.error("******* Cloud Controller Service bundle is 
failed to activate ****", e);
+               }
+       }
+
+       protected void setTaskService(TaskService taskService) {
+               if (log.isDebugEnabled()) {
+                       log.debug("Setting the Task Service");
+               }
+               
ServiceReferenceHolder.getInstance().setTaskService(taskService);
+       }
+
+       protected void unsetTaskService(TaskService taskService) {
+               if (log.isDebugEnabled()) {
+                       log.debug("Unsetting the Task Service");
+               }
+               ServiceReferenceHolder.getInstance().setTaskService(null);
+       }
+
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
        protected void setRegistryService(RegistryService registryService) {
                if (log.isDebugEnabled()) {
                        log.debug("Setting the Registry Service");
                }
-               
-               try {                   
+
+               try {
                        UserRegistry registry = 
registryService.getGovernanceSystemRegistry();
+<<<<<<< HEAD
                ServiceReferenceHolder.getInstance().setRegistry(registry);
         } catch (RegistryException e) {
                String msg = "Failed when retrieving Governance System 
Registry.";
                log.error(msg, e);
                throw new CloudControllerException(msg, e);
         } 
+=======
+                       ServiceReferenceHolder.getInstance()
+                                             .setRegistry(registry);
+               } catch (RegistryException e) {
+                       String msg = "Failed when retrieving Governance System 
Registry.";
+                       log.error(msg, e);
+                       throw new CloudControllerException(msg, e);
+               }
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
        }
 
        protected void unsetRegistryService(RegistryService registryService) {
                if (log.isDebugEnabled()) {
+<<<<<<< HEAD
             log.debug("Un-setting the Registry Service");
         }
         ServiceReferenceHolder.getInstance().setRegistry(null);
+=======
+                       log.debug("Unsetting the Registry Service");
+               }
+               ServiceReferenceHolder.getInstance().setRegistry(null);
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
        }
-       
+
        protected void 
setConfigurationContextService(ConfigurationContextService cfgCtxService) {
+<<<<<<< HEAD
         ServiceReferenceHolder.getInstance().setAxisConfiguration(
                 cfgCtxService.getServerConfigContext().getAxisConfiguration());
     }
@@ -162,8 +238,26 @@ public class CloudControllerServiceComponent {
         
ServiceReferenceHolder.getInstance().setDistributedObjectProvider(null);
     }
        
+=======
+               ServiceReferenceHolder.getInstance().setAxisConfiguration(
+                               
cfgCtxService.getServerConfigContext().getAxisConfiguration());
+       }
+
+       protected void 
unsetConfigurationContextService(ConfigurationContextService cfgCtxService) {
+               ServiceReferenceHolder.getInstance().setAxisConfiguration(null);
+       }
+
+       protected void setDistributedMapProvider(DistributedMapProvider 
mapProvider) {
+               
ServiceReferenceHolder.getInstance().setDistributedMapProvider(mapProvider);
+       }
+
+       protected void unsetDistributedMapProvider(DistributedMapProvider 
mapProvider) {
+               
ServiceReferenceHolder.getInstance().setDistributedMapProvider(null);
+       }
+
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
        protected void deactivate(ComponentContext ctx) {
-        // Close event publisher connections to message broker
-        EventPublisherPool.close(Util.Topics.TOPOLOGY_TOPIC.getTopicName());
+               // Close event publisher connections to message broker
+               
EventPublisherPool.close(Util.Topics.TOPOLOGY_TOPIC.getTopicName());
        }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
index 87dfe6e..d65b7f5 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
@@ -29,45 +29,44 @@ import 
org.apache.stratos.messaging.message.receiver.applications.ApplicationsEv
 /**
  * This is to receive the application topic messages.
  */
-public class ApplicationTopicReceiver{
-    private static final Log log = 
LogFactory.getLog(ApplicationTopicReceiver.class);
-    private ApplicationsEventReceiver applicationsEventReceiver;
-    private boolean terminated;
+public class ApplicationTopicReceiver {
+       private static final Log log = 
LogFactory.getLog(ApplicationTopicReceiver.class);
+       private ApplicationsEventReceiver applicationsEventReceiver;
+       private boolean terminated;
 
-    public ApplicationTopicReceiver() {
-        this.applicationsEventReceiver = new ApplicationsEventReceiver();
-        addEventListeners();
+       public ApplicationTopicReceiver() {
+               this.applicationsEventReceiver = new 
ApplicationsEventReceiver();
+               addEventListeners();
 
-    }
+       }
 
-    
+       public void execute() {
 
-    public void execute() {
+               if (log.isInfoEnabled()) {
+                       log.info("Cloud controller application status thread 
started");
+               }
+               applicationsEventReceiver.execute();
 
-        if (log.isInfoEnabled()) {
-            log.info("Cloud controller application status thread started");
-        }
-           applicationsEventReceiver.execute();
+               if (log.isInfoEnabled()) {
+                       log.info("Cloud controller application status thread 
terminated");
+               }
 
-        if (log.isInfoEnabled()) {
-            log.info("Cloud controller application status thread terminated");
-        }
+       }
 
-    }
-    private void addEventListeners() {
-        applicationsEventReceiver.addEventListener(new 
ApplicationTerminatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                //Remove the application related data
-                ApplicationTerminatedEvent terminatedEvent = 
(ApplicationTerminatedEvent)event;
-                log.info("ApplicationTerminatedEvent received for 
[application] " + terminatedEvent.getAppId());
-                String appId = terminatedEvent.getAppId();
-                TopologyBuilder.handleApplicationClustersRemoved(appId, 
terminatedEvent.getClusterData());
-            }
-        });
-    }
+       private void addEventListeners() {
+               applicationsEventReceiver.addEventListener(new 
ApplicationTerminatedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               //Remove the application related data
+                               ApplicationTerminatedEvent terminatedEvent = 
(ApplicationTerminatedEvent) event;
+                               log.info("ApplicationTerminatedEvent received 
for [application] " + terminatedEvent.getAppId());
+                               String appId = terminatedEvent.getAppId();
+                               
TopologyBuilder.handleApplicationClustersRemoved(appId, 
terminatedEvent.getClusterData());
+                       }
+               });
+       }
 
-    public void setTerminated(boolean terminated) {
-        this.terminated = terminated;
-    }
+       public void setTerminated(boolean terminated) {
+               this.terminated = terminated;
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
index bd2fbf0..ca6d4ad 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
@@ -26,16 +26,16 @@ import org.apache.stratos.messaging.event.cluster.status.*;
 import org.apache.stratos.messaging.listener.cluster.status.*;
 import 
org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
 
-public class ClusterStatusTopicReceiver{
-    private static final Log log = 
LogFactory.getLog(ClusterStatusTopicReceiver.class);
+public class ClusterStatusTopicReceiver {
+       private static final Log log = 
LogFactory.getLog(ClusterStatusTopicReceiver.class);
 
-    private ClusterStatusEventReceiver statusEventReceiver;
-    private boolean terminated;
+       private ClusterStatusEventReceiver statusEventReceiver;
+       private boolean terminated;
 
-    public ClusterStatusTopicReceiver() {
-        this.statusEventReceiver = new ClusterStatusEventReceiver();
-        addEventListeners();
-    }
+       public ClusterStatusTopicReceiver() {
+               this.statusEventReceiver = new ClusterStatusEventReceiver();
+               addEventListeners();
+       }
 
        public void execute() {
 
@@ -47,58 +47,58 @@ public class ClusterStatusTopicReceiver{
        }
 
        private void addEventListeners() {
-        // Listen to topology events that affect clusters
-        statusEventReceiver.addEventListener(new 
ClusterStatusClusterResetEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                
TopologyBuilder.handleClusterReset((ClusterStatusClusterResetEvent) event);
-            }
-        });
+               // Listen to topology events that affect clusters
+               statusEventReceiver.addEventListener(new 
ClusterStatusClusterResetEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               
TopologyBuilder.handleClusterReset((ClusterStatusClusterResetEvent) event);
+                       }
+               });
 
-        statusEventReceiver.addEventListener(new 
ClusterStatusClusterInstanceCreatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                
//TopologyBuilder.handleClusterInstanceCreated((ClusterStatusClusterInstanceCreatedEvent)
 event);
-            }
-        });
+               statusEventReceiver.addEventListener(new 
ClusterStatusClusterInstanceCreatedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               
//TopologyBuilder.handleClusterInstanceCreated((ClusterStatusClusterInstanceCreatedEvent)
 event);
+                       }
+               });
 
-        statusEventReceiver.addEventListener(new 
ClusterStatusClusterCreatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                
TopologyBuilder.handleClusterCreated((ClusterStatusClusterCreatedEvent) event);
-            }
-        });
+               statusEventReceiver.addEventListener(new 
ClusterStatusClusterCreatedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               
TopologyBuilder.handleClusterCreated((ClusterStatusClusterCreatedEvent) event);
+                       }
+               });
 
-        statusEventReceiver.addEventListener(new 
ClusterStatusClusterActivatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                
TopologyBuilder.handleClusterActivatedEvent((ClusterStatusClusterActivatedEvent)
 event);
-            }
-        });
+               statusEventReceiver.addEventListener(new 
ClusterStatusClusterActivatedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               
TopologyBuilder.handleClusterActivatedEvent((ClusterStatusClusterActivatedEvent)
 event);
+                       }
+               });
 
-        statusEventReceiver.addEventListener(new 
ClusterStatusClusterTerminatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                
TopologyBuilder.handleClusterTerminatedEvent((ClusterStatusClusterTerminatedEvent)
 event);
-            }
-        });
+               statusEventReceiver.addEventListener(new 
ClusterStatusClusterTerminatedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               
TopologyBuilder.handleClusterTerminatedEvent((ClusterStatusClusterTerminatedEvent)
 event);
+                       }
+               });
 
-        statusEventReceiver.addEventListener(new 
ClusterStatusClusterTerminatingEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                
TopologyBuilder.handleClusterTerminatingEvent((ClusterStatusClusterTerminatingEvent)
 event);
-            }
-        });
+               statusEventReceiver.addEventListener(new 
ClusterStatusClusterTerminatingEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               
TopologyBuilder.handleClusterTerminatingEvent((ClusterStatusClusterTerminatingEvent)
 event);
+                       }
+               });
 
-        statusEventReceiver.addEventListener(new 
ClusterStatusClusterInactivateEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                
TopologyBuilder.handleClusterInActivateEvent((ClusterStatusClusterInactivateEvent)
 event);
-            }
-        });
-    }
+               statusEventReceiver.addEventListener(new 
ClusterStatusClusterInactivateEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               
TopologyBuilder.handleClusterInActivateEvent((ClusterStatusClusterInactivateEvent)
 event);
+                       }
+               });
+       }
 
-    public void setTerminated(boolean terminated) {
-        this.terminated = terminated;
-    }
+       public void setTerminated(boolean terminated) {
+               this.terminated = terminated;
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
index d5475f0..42aabed 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
@@ -35,71 +35,67 @@ import 
org.apache.stratos.messaging.message.receiver.instance.status.InstanceSta
 /**
  * This will handle the instance status events
  */
-public class InstanceStatusTopicReceiver{
-    private static final Log log = 
LogFactory.getLog(InstanceStatusTopicReceiver.class);
-
-    private InstanceStatusEventReceiver statusEventReceiver;
-    private boolean terminated;
-
-    public InstanceStatusTopicReceiver() {
-        this.statusEventReceiver = new InstanceStatusEventReceiver();
-        addEventListeners();
-    }
-
-
-
-    public void execute() {
-        statusEventReceiver.execute();
-        if (log.isInfoEnabled()) {
-            log.info("Cloud controller application status thread started");
-        }
-
-
-        if (log.isInfoEnabled()) {
-            log.info("Cloud controller application status thread terminated");
-        }
-    }
-
-    private void addEventListeners() {
-        statusEventReceiver.addEventListener(new 
InstanceActivatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) 
event);
-            }
-        });
-
-        statusEventReceiver.addEventListener(new 
InstanceStartedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                TopologyBuilder.handleMemberStarted((InstanceStartedEvent) 
event);
-            }
-        });
-
-        statusEventReceiver.addEventListener(new 
InstanceReadyToShutdownEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                try {
-                    
TopologyBuilder.handleMemberReadyToShutdown((InstanceReadyToShutdownEvent) 
event);
-                } catch (Exception e) {
-                    String error = "Failed to retrieve the instance status 
event message";
-                    log.error(error, e);
-                }
-            }
-        });
-
-        statusEventReceiver.addEventListener(new InstanceMaintenanceListener() 
{
-            @Override
-            protected void onEvent(Event event) {
-                try {
-                    
TopologyBuilder.handleMemberMaintenance((InstanceMaintenanceModeEvent) event);
-                } catch (Exception e) {
-                String error = "Failed to retrieve the instance status event 
message";
-                log.error(error, e);
-                }
-            }
-        });
-
-
-    }
+public class InstanceStatusTopicReceiver {
+       private static final Log log = 
LogFactory.getLog(InstanceStatusTopicReceiver.class);
+
+       private InstanceStatusEventReceiver statusEventReceiver;
+       private boolean terminated;
+
+       public InstanceStatusTopicReceiver() {
+               this.statusEventReceiver = new InstanceStatusEventReceiver();
+               addEventListeners();
+       }
+
+       public void execute() {
+               statusEventReceiver.execute();
+               if (log.isInfoEnabled()) {
+                       log.info("Cloud controller application status thread 
started");
+               }
+
+               if (log.isInfoEnabled()) {
+                       log.info("Cloud controller application status thread 
terminated");
+               }
+       }
+
+       private void addEventListeners() {
+               statusEventReceiver.addEventListener(new 
InstanceActivatedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               
TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) event);
+                       }
+               });
+
+               statusEventReceiver.addEventListener(new 
InstanceStartedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               
TopologyBuilder.handleMemberStarted((InstanceStartedEvent) event);
+                       }
+               });
+
+               statusEventReceiver.addEventListener(new 
InstanceReadyToShutdownEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               try {
+                                       
TopologyBuilder.handleMemberReadyToShutdown((InstanceReadyToShutdownEvent) 
event);
+                               } catch (Exception e) {
+                                       String error = "Failed to retrieve the 
instance status event message";
+                                       log.error(error, e);
+                               }
+                       }
+               });
+
+               statusEventReceiver.addEventListener(new 
InstanceMaintenanceListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               try {
+                                       
TopologyBuilder.handleMemberMaintenance((InstanceMaintenanceModeEvent) event);
+                               } catch (Exception e) {
+                                       String error = "Failed to retrieve the 
instance status event message";
+                                       log.error(error, e);
+                               }
+                       }
+               });
+
+       }
 
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index e74721f..188b2ac 100644
--- 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -35,136 +35,134 @@ import java.util.concurrent.ExecutorService;
  * received from the message broker.
  */
 public class LoadBalancerExtension implements Runnable {
-    private static final Log log = 
LogFactory.getLog(LoadBalancerExtension.class);
-
-    private LoadBalancer loadBalancer;
-    private LoadBalancerStatisticsReader statsReader;
-    private boolean loadBalancerStarted;
-    private TopologyEventReceiver topologyEventReceiver;
-    private LoadBalancerStatisticsNotifier statisticsNotifier;
-    private boolean terminated;
+       private static final Log log = 
LogFactory.getLog(LoadBalancerExtension.class);
+
+       private LoadBalancer loadBalancer;
+       private LoadBalancerStatisticsReader statsReader;
+       private boolean loadBalancerStarted;
+       private TopologyEventReceiver topologyEventReceiver;
+       private LoadBalancerStatisticsNotifier statisticsNotifier;
+       private boolean terminated;
        private ExecutorService executorService;
-    /**
-     * Load balancer extension constructor.
-     * @param loadBalancer Load balancer instance: Mandatory.
-     * @param statsReader Statistics reader: If null statistics notifier 
thread will not be started.
-     */
-    public LoadBalancerExtension(LoadBalancer loadBalancer, 
LoadBalancerStatisticsReader statsReader) {
-        this.loadBalancer = loadBalancer;
-        this.statsReader = statsReader;
-    }
-
-    @Override
-    public void run() {
-        try {
-            if(log.isInfoEnabled()) {
-                log.info("Load balancer extension started");
-            }
-
-            // Start topology receiver thread
-            topologyEventReceiver = new TopologyEventReceiver();
-            addEventListeners();
-               topologyEventReceiver.setExecutorService(executorService);
-               topologyEventReceiver.execute();
-
-
-            if(statsReader != null) {
-                // Start stats notifier thread
-                statisticsNotifier = new 
LoadBalancerStatisticsNotifier(statsReader);
-                Thread statsNotifierThread = new Thread(statisticsNotifier);
-                statsNotifierThread.start();
-            }
-            else {
-                if(log.isWarnEnabled()) {
-                    log.warn("Load balancer statistics reader not found");
-                }
-            }
-
-            // Keep the thread live until terminated
-            while (!terminated);
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("Could not start load balancer extension", e);
-            }
-        }
-    }
-
-    private void addEventListeners() {
-        topologyEventReceiver.addEventListener(new 
CompleteTopologyEventListener() {
-
-            @Override
-            protected void onEvent(Event event) {
-                try {
-
-                    if (!loadBalancerStarted) {
-                        // Configure load balancer
-                        loadBalancer.configure(TopologyManager.getTopology());
-
-                        // Start load balancer
-                        loadBalancer.start();
-                        loadBalancerStarted = true;
-                    }
-                } catch (Exception e) {
-                    if (log.isErrorEnabled()) {
-                        log.error("Could not start load balancer", e);
-                    }
-                    terminate();
-                }
-            }
-        });
-        topologyEventReceiver.addEventListener(new 
MemberActivatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                reloadConfiguration();
-            }
-        });
-        topologyEventReceiver.addEventListener(new 
MemberSuspendedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                reloadConfiguration();
-            }
-        });
-        topologyEventReceiver.addEventListener(new 
MemberTerminatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                reloadConfiguration();
-            }
-        });
-        topologyEventReceiver.addEventListener(new 
ClusterRemovedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                reloadConfiguration();
-            }
-        });
-        topologyEventReceiver.addEventListener(new 
ServiceRemovedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                reloadConfiguration();
-            }
-        });
-    }
-
-    private void reloadConfiguration() {
-        try {
-            if (loadBalancerStarted) {
-                loadBalancer.reload(TopologyManager.getTopology());
-            }
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("Could not reload load balancer configuration", e);
-            }
-        }
-    }
-
-    public void terminate() {
-        if(topologyEventReceiver != null) {
-            topologyEventReceiver.terminate();
-        }
-        if(statisticsNotifier != null) {
-            statisticsNotifier.terminate();
-        }
-        terminated = true;
-    }
+
+       /**
+        * Load balancer extension constructor.
+        *
+        * @param loadBalancer Load balancer instance: Mandatory.
+        * @param statsReader  Statistics reader: If null statistics notifier 
thread will not be started.
+        */
+       public LoadBalancerExtension(LoadBalancer loadBalancer, 
LoadBalancerStatisticsReader statsReader) {
+               this.loadBalancer = loadBalancer;
+               this.statsReader = statsReader;
+       }
+
+       @Override
+       public void run() {
+               try {
+                       if (log.isInfoEnabled()) {
+                               log.info("Load balancer extension started");
+                       }
+
+                       // Start topology receiver thread
+                       topologyEventReceiver = new TopologyEventReceiver();
+                       addEventListeners();
+                       
topologyEventReceiver.setExecutorService(executorService);
+                       topologyEventReceiver.execute();
+
+                       if (statsReader != null) {
+                               // Start stats notifier thread
+                               statisticsNotifier = new 
LoadBalancerStatisticsNotifier(statsReader);
+                               Thread statsNotifierThread = new 
Thread(statisticsNotifier);
+                               statsNotifierThread.start();
+                       } else {
+                               if (log.isWarnEnabled()) {
+                                       log.warn("Load balancer statistics 
reader not found");
+                               }
+                       }
+
+               } catch (Exception e) {
+                       if (log.isErrorEnabled()) {
+                               log.error("Could not start load balancer 
extension", e);
+                       }
+               }
+       }
+
+       private void addEventListeners() {
+               topologyEventReceiver.addEventListener(new 
CompleteTopologyEventListener() {
+
+                       @Override
+                       protected void onEvent(Event event) {
+                               try {
+
+                                       if (!loadBalancerStarted) {
+                                               // Configure load balancer
+                                               
loadBalancer.configure(TopologyManager.getTopology());
+
+                                               // Start load balancer
+                                               loadBalancer.start();
+                                               loadBalancerStarted = true;
+                                       }
+                               } catch (Exception e) {
+                                       if (log.isErrorEnabled()) {
+                                               log.error("Could not start load 
balancer", e);
+                                       }
+                                       terminate();
+                               }
+                       }
+               });
+               topologyEventReceiver.addEventListener(new 
MemberActivatedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               reloadConfiguration();
+                       }
+               });
+               topologyEventReceiver.addEventListener(new 
MemberSuspendedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               reloadConfiguration();
+                       }
+               });
+               topologyEventReceiver.addEventListener(new 
MemberTerminatedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               reloadConfiguration();
+                       }
+               });
+               topologyEventReceiver.addEventListener(new 
ClusterRemovedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               reloadConfiguration();
+                       }
+               });
+               topologyEventReceiver.addEventListener(new 
ServiceRemovedEventListener() {
+                       @Override
+                       protected void onEvent(Event event) {
+                               reloadConfiguration();
+                       }
+               });
+       }
+
+       private void reloadConfiguration() {
+               try {
+                       if (loadBalancerStarted) {
+                               
loadBalancer.reload(TopologyManager.getTopology());
+                       }
+               } catch (Exception e) {
+                       if (log.isErrorEnabled()) {
+                               log.error("Could not reload load balancer 
configuration", e);
+                       }
+               }
+       }
+
+       public void terminate() {
+               if (topologyEventReceiver != null) {
+                       topologyEventReceiver.terminate();
+               }
+               if (statisticsNotifier != null) {
+                       statisticsNotifier.terminate();
+               }
+               terminated = true;
+       }
 
        public ExecutorService getExecutorService() {
                return executorService;

http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 56a3fcf..5ee28d1 100644
--- 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -58,53 +58,56 @@ import java.util.concurrent.TimeUnit;
 @SiddhiExtension(namespace = "stratos", function = "faultHandling")
 public class FaultHandlingWindowProcessor extends WindowProcessor implements 
RunnableWindowProcessor {
 
-    private static final int TIME_OUT = 60 * 1000;
-    static final Logger log = 
Logger.getLogger(FaultHandlingWindowProcessor.class);
-    private ScheduledExecutorService faultHandleScheduler;
-    private ThreadBarrier threadBarrier;
-    private long timeToKeep;
-    private ISchedulerSiddhiQueue<StreamEvent> window;
-    private EventPublisher healthStatPublisher = 
EventPublisherPool.getPublisher(Util.Topics.HEALTH_STAT_TOPIC.getTopicName());
-    private Map<String, Object> MemberFaultEventMap = new HashMap<String, 
Object>();
-    private Map<String, Object> memberFaultEventMessageMap = new 
HashMap<String, Object>();
-
-    // Map of member id's to their last received health event time stamp
-    private ConcurrentHashMap<String, Long> memberTimeStampMap = new 
ConcurrentHashMap<String, Long>();
-
-    // Event receiver to receive topology events published by cloud-controller
-    private CEPTopologyEventReceiver cepTopologyEventReceiver = new 
CEPTopologyEventReceiver(this);
-
-    // Stratos member id attribute index in stream execution plan
-    private int memberIdAttrIndex;
-
-    @Override
-    protected void processEvent(InEvent event) {
-        addDataToMap(event);
-    }
-
-    @Override
-    protected void processEvent(InListEvent listEvent) {
-        for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
-            addDataToMap((InEvent) listEvent.getEvent(i));
-        }
-    }
-
-    /**
-     * Add new entry to time stamp map from the received event.
-     *
-     * @param event Event received by Siddhi.
-     */
-    protected void addDataToMap(InEvent event) {
-        String id = (String) event.getData()[memberIdAttrIndex];
-        //checking whether this member is the topology.
-        //sometimes there can be a delay between publishing member terminated 
events
-        //and actually terminating instances. Hence CEP might get events for 
already terminated members
-        //so we are checking the topology for the member existence
-        Member member = getMemberFromId(id);
-        if (null == member) {
+       private static final int TIME_OUT = 60 * 1000;
+       static final Logger log = 
Logger.getLogger(FaultHandlingWindowProcessor.class);
+       public static final String IDENTIFIER = "AutoScaler";
+       private ScheduledExecutorService faultHandleScheduler;
+       private ThreadBarrier threadBarrier;
+       private long timeToKeep;
+       private ISchedulerSiddhiQueue<StreamEvent> window;
+       private EventPublisher healthStatPublisher =
+                       
EventPublisherPool.getPublisher(Util.Topics.HEALTH_STAT_TOPIC.getTopicName());
+       private Map<String, Object> MemberFaultEventMap = new HashMap<String, 
Object>();
+       private Map<String, Object> memberFaultEventMessageMap = new 
HashMap<String, Object>();
+
+       // Map of member id's to their last received health event time stamp
+       private ConcurrentHashMap<String, Long> memberTimeStampMap = new 
ConcurrentHashMap<String, Long>();
+
+       // Event receiver to receive topology events published by 
cloud-controller
+       private CEPTopologyEventReceiver cepTopologyEventReceiver = new 
CEPTopologyEventReceiver(this);
+
+       // Stratos member id attribute index in stream execution plan
+       private int memberIdAttrIndex;
+
+       @Override
+       protected void processEvent(InEvent event) {
+               addDataToMap(event);
+       }
+
+       @Override
+       protected void processEvent(InListEvent listEvent) {
+               for (int i = 0, size = listEvent.getActiveEvents(); i < size; 
i++) {
+                       addDataToMap((InEvent) listEvent.getEvent(i));
+               }
+       }
+
+       /**
+        * Add new entry to time stamp map from the received event.
+        *
+        * @param event Event received by Siddhi.
+        */
+       protected void addDataToMap(InEvent event) {
+               String id = (String) event.getData()[memberIdAttrIndex];
+               //checking whether this member is the topology.
+               //sometimes there can be a delay between publishing member 
terminated events
+               //and actually terminating instances. Hence CEP might get 
events for already terminated members
+               //so we are checking the topology for the member existence
+               Member member = getMemberFromId(id);
+               if (null == member) {
                        log.debug("Member not found in the toplogy. Event 
rejected");
                        return;
                }
+<<<<<<< HEAD
         if (StringUtils.isNotEmpty(id)) {
             memberTimeStampMap.put(id, event.getTimeStamp());
         } else {
@@ -317,4 +320,220 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
     public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
         return memberTimeStampMap;
     }
+=======
+               if (StringUtils.isNotEmpty(id)) {
+                       memberTimeStampMap.put(id, event.getTimeStamp());
+               } else {
+                       log.warn("NULL member id found in the event received. 
Event rejected.");
+               }
+               if (log.isDebugEnabled()) {
+                       log.debug("Event received from [member-id] " + id + " 
[time-stamp] " + event.getTimeStamp());
+               }
+       }
+
+       @Override
+       public Iterator<StreamEvent> iterator() {
+               return window.iterator();
+       }
+
+       @Override
+       public Iterator<StreamEvent> iterator(String predicate) {
+               if (siddhiContext.isDistributedProcessingEnabled()) {
+                       return ((SchedulerSiddhiQueueGrid<StreamEvent>) 
window).iterator(predicate);
+               } else {
+                       return window.iterator();
+               }
+       }
+
+       /**
+        * Retrieve the current activated members from the topology and 
initialize the time stamp map.
+        * This will allow the system to recover from a restart
+        *
+        * @param topology Topology model object
+        */
+       boolean loadTimeStampMapFromTopology(Topology topology) {
+
+               long currentTimeStamp = System.currentTimeMillis();
+               if (topology == null || topology.getServices() == null) {
+                       return false;
+               }
+               // TODO make this efficient by adding APIs to messaging 
component
+               for (Service service : topology.getServices()) {
+                       if (service.getClusters() != null) {
+                               for (Cluster cluster : service.getClusters()) {
+                                       if (cluster.getMembers() != null) {
+                                               for (Member member : 
cluster.getMembers()) {
+                                                       // we are checking 
faulty status only in previously activated members
+                                                       if (member != null && 
MemberStatus.Activated.equals(member.getStatus())) {
+                                                               // Initialize 
the member time stamp map from the topology at the beginning
+                                                               
memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+               }
+
+               log.info("Member time stamp map was successfully loaded from 
the topology.");
+               if (log.isDebugEnabled()) {
+                       log.debug("Member TimeStamp Map: " + 
memberTimeStampMap);
+               }
+               return true;
+       }
+
+       private Member getMemberFromId(String memberId) {
+               if (StringUtils.isEmpty(memberId)) {
+                       return null;
+               }
+               if (TopologyManager.getTopology().isInitialized()) {
+                       try {
+                               TopologyManager.acquireReadLock();
+                               if (TopologyManager.getTopology().getServices() 
== null) {
+                                       return null;
+                               }
+                               // TODO make this efficient by adding APIs to 
messaging component
+                               for (Service service : 
TopologyManager.getTopology().getServices()) {
+                                       if (service.getClusters() != null) {
+                                               for (Cluster cluster : 
service.getClusters()) {
+                                                       if 
(cluster.getMembers() != null) {
+                                                               for (Member 
member : cluster.getMembers()) {
+                                                                       if 
(memberId.equals(member.getMemberId())) {
+                                                                               
return member;
+                                                                       }
+                                                               }
+                                                       }
+                                               }
+                                       }
+                               }
+                       } catch (Exception e) {
+                               log.error("Error while reading topology" + e);
+                       } finally {
+                               TopologyManager.releaseReadLock();
+                       }
+               }
+               return null;
+       }
+
+       private void publishMemberFault(String memberId) {
+               Member member = getMemberFromId(memberId);
+               if (member == null) {
+                       log.error("Failed to publish member fault event. Member 
having [member-id] " + memberId +
+                                 " does not exist in topology");
+                       return;
+               }
+               log.info("Publishing member fault event for [member-id] " + 
memberId);
+
+               MemberFaultEvent memberFaultEvent =
+                               new MemberFaultEvent(member.getClusterId(), 
member.getInstanceId(), member.getMemberId(),
+                                                    member.getPartitionId(), 
0);
+
+               memberFaultEventMessageMap.put("message", memberFaultEvent);
+               healthStatPublisher.publish(MemberFaultEventMap, true);
+       }
+
+       @Override
+       public void run() {
+               try {
+                       threadBarrier.pass();
+
+                       for (Object o : memberTimeStampMap.entrySet()) {
+                               Map.Entry pair = (Map.Entry) o;
+                               long currentTime = System.currentTimeMillis();
+                               Long eventTimeStamp = (Long) pair.getValue();
+
+                               if ((currentTime - eventTimeStamp) > TIME_OUT) {
+                                       log.info("Faulty member detected 
[member-id] " + pair.getKey() + " with [last time-stamp] " +
+                                                eventTimeStamp + " [time-out] 
" + TIME_OUT + " milliseconds");
+                                       publishMemberFault((String) 
pair.getKey());
+                               }
+                       }
+                       if (log.isDebugEnabled()) {
+                               log.debug("Fault handling processor iteration 
completed with [time-stamp map length] " +
+                                         memberTimeStampMap.size() + " 
[time-stamp map] " + memberTimeStampMap);
+                       }
+               } catch (Throwable t) {
+                       log.error(t.getMessage(), t);
+               } finally {
+                       faultHandleScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
+               }
+       }
+
+       @Override
+       protected Object[] currentState() {
+               return new Object[] { window.currentState() };
+       }
+
+       @Override
+       protected void restoreState(Object[] data) {
+               window.restoreState(data);
+               window.restoreState((Object[]) data[0]);
+               window.reSchedule();
+       }
+
+       @Override
+       protected void init(Expression[] parameters, QueryPostProcessingElement 
nextProcessor,
+                           AbstractDefinition streamDefinition, String 
elementId, boolean async,
+                           SiddhiContext siddhiContext) {
+
+               if (parameters[0] instanceof IntConstant) {
+                       timeToKeep = ((IntConstant) parameters[0]).getValue();
+               } else {
+                       timeToKeep = ((LongConstant) parameters[0]).getValue();
+               }
+
+               String memberIdAttrName = ((Variable) 
parameters[1]).getAttributeName();
+               memberIdAttrIndex = 
streamDefinition.getAttributePosition(memberIdAttrName);
+
+               if (this.siddhiContext.isDistributedProcessingEnabled()) {
+                       window = new 
SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, 
this.async);
+               } else {
+                       window = new SchedulerSiddhiQueue<StreamEvent>(this);
+               }
+               MemberFaultEventMap
+                               
.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", 
memberFaultEventMessageMap);
+
+               ExecutorService executorService = 
StratosThreadPool.getExecutorService(IDENTIFIER, 10);
+               cepTopologyEventReceiver.setExecutorService(executorService);
+               executorService.execute(cepTopologyEventReceiver);
+
+               //Ordinary scheduling
+               window.schedule();
+               if (log.isDebugEnabled()) {
+                       log.debug("Fault handling window processor initialized 
with [timeToKeep] " + timeToKeep +
+                                 ", [memberIdAttrName] " + memberIdAttrName + 
", [memberIdAttrIndex] " + memberIdAttrIndex +
+                                 ", [distributed-enabled] " + 
this.siddhiContext.isDistributedProcessingEnabled());
+               }
+       }
+
+       @Override
+       public void schedule() {
+               faultHandleScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
+       }
+
+       @Override
+       public void scheduleNow() {
+               faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+       }
+
+       @Override
+       public void setScheduledExecutorService(ScheduledExecutorService 
scheduledExecutorService) {
+               this.faultHandleScheduler = scheduledExecutorService;
+       }
+
+       @Override
+       public void setThreadBarrier(ThreadBarrier threadBarrier) {
+               this.threadBarrier = threadBarrier;
+       }
+
+       @Override
+       public void destroy() {
+               // terminate topology listener thread
+               cepTopologyEventReceiver.terminate();
+               window = null;
+       }
+
+       public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
+               return memberTimeStampMap;
+       }
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
index 013aee9..7996672 100644
--- 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
+++ 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
@@ -31,32 +31,34 @@ import java.util.concurrent.ExecutorService;
  * HAProxy extension main class.
  */
 public class Main {
-    private static final Log log = LogFactory.getLog(Main.class);
+       private static final Log log = LogFactory.getLog(Main.class);
        private static ExecutorService executorService;
 
        public static void main(String[] args) {
 
-        LoadBalancerExtension extension = null;
-        try {
-            // Configure log4j properties
-            
PropertyConfigurator.configure(System.getProperty("log4j.properties.file.path"));
+               LoadBalancerExtension extension = null;
+               try {
+                       // Configure log4j properties
+                       
PropertyConfigurator.configure(System.getProperty("log4j.properties.file.path"));
 
-            if (log.isInfoEnabled()) {
-                log.info("HAProxy extension started");
-            }
-               executorService = 
StratosThreadPool.getExecutorService("Load_Balance_Extension", 10);
-            // Validate runtime parameters
-            HAProxyContext.getInstance().validate();
-            extension = new LoadBalancerExtension(new HAProxy(), 
(HAProxyContext.getInstance().isCEPStatsPublisherEnabled() ? new 
HAProxyStatisticsReader() : null));
-            Thread thread = new Thread(extension);
-            thread.start();
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error(e);
-            }
-            if (extension != null) {
-                extension.terminate();
-            }
-        }
-    }
+                       if (log.isInfoEnabled()) {
+                               log.info("HAProxy extension started");
+                       }
+                       executorService = 
StratosThreadPool.getExecutorService("Load_Balance_Extension", 10);
+                       // Validate runtime parameters
+                       HAProxyContext.getInstance().validate();
+                       extension = new LoadBalancerExtension(new HAProxy(),
+                                                             
(HAProxyContext.getInstance().isCEPStatsPublisherEnabled() ?
+                                                              new 
HAProxyStatisticsReader() : null));
+                       Thread thread = new Thread(extension);
+                       thread.start();
+               } catch (Exception e) {
+                       if (log.isErrorEnabled()) {
+                               log.error(e);
+                       }
+                       if (extension != null) {
+                               extension.terminate();
+                       }
+               }
+       }
 }

Reply via email to