merge with new changes

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/f70aa9ed
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/f70aa9ed
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/f70aa9ed

Branch: refs/heads/master
Commit: f70aa9edc4d0e32d3b726b99eca217dfd4d3a475
Parents: 4e73393
Author: gayan <[email protected]>
Authored: Tue Dec 2 17:42:21 2014 +0530
Committer: gayan <[email protected]>
Committed: Tue Dec 2 17:42:21 2014 +0530

----------------------------------------------------------------------
 .../AutoscalerTopologyEventReceiver.java        | 498 +------------------
 .../internal/AutoscalerServerComponent.java     | 230 ++-------
 .../CloudControllerServiceComponent.java        | 143 +-----
 .../internal/LoadBalancerServiceComponent.java  |  29 +-
 .../extension/FaultHandlingWindowProcessor.java | 217 --------
 5 files changed, 87 insertions(+), 1030 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index 1f14542..f4a5169 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -27,8 +27,11 @@ import 
org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory;
 import org.apache.stratos.autoscaler.context.cluster.VMClusterContext;
 import 
org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
 import 
org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher;
+import 
org.apache.stratos.autoscaler.exception.application.DependencyBuilderException;
+import 
org.apache.stratos.autoscaler.exception.application.TopologyInConsistentException;
 import 
org.apache.stratos.autoscaler.exception.partition.PartitionValidationException;
 import 
org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
+import org.apache.stratos.autoscaler.monitor.MonitorFactory;
 import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
 import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
 import org.apache.stratos.autoscaler.monitor.events.ClusterStatusEvent;
@@ -63,34 +66,22 @@ public class AutoscalerTopologyEventReceiver {
        private boolean topologyInitialized;
        private ExecutorService executorService;
 
-<<<<<<< HEAD
-    public AutoscalerTopologyEventReceiver() {
-        this.topologyEventReceiver = new TopologyEventReceiver();
-        addEventListeners();
-    }
-
+       public AutoscalerTopologyEventReceiver() {
+               this.topologyEventReceiver = new TopologyEventReceiver();
+               addEventListeners();
+       }
 
-    public void execute() {
-        //FIXME this activated before autoscaler deployer activated.
+       public void execute() {
+               //FIXME this activated before autoscaler deployer activated.
 
-           topologyEventReceiver.setExecutorService(getExecutorService());
-           topologyEventReceiver.execute();
+               topologyEventReceiver.setExecutorService(getExecutorService());
+               topologyEventReceiver.execute();
 
-           if (log.isInfoEnabled()) {
-            log.info("Autoscaler topology receiver thread started");
-        }
+               if (log.isInfoEnabled()) {
+                       log.info("Autoscaler topology receiver thread started");
+               }
 
-        // Keep the thread live until terminated
-        while (!terminated) {
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException ignore) {
-            }
-        }
-        if (log.isInfoEnabled()) {
-            log.info("Autoscaler topology receiver thread terminated");
-        }
-    }
+       }
 
     private boolean allClustersInitialized(Application application) {
         boolean allClustersInitialized = false;
@@ -524,461 +515,6 @@ public class AutoscalerTopologyEventReceiver {
         topologyEventReceiver.terminate();
         terminated = true;
     }
-=======
-       public AutoscalerTopologyEventReceiver() {
-               this.topologyEventReceiver = new TopologyEventReceiver();
-               addEventListeners();
-       }
-
-       public void execute() {
-               //FIXME this activated before autoscaler deployer activated.
-
-               topologyEventReceiver.setExecutorService(executorService);
-               topologyEventReceiver.execute();
-
-               if (log.isInfoEnabled()) {
-                       log.info("Autoscaler topology receiver thread started");
-               }
-
-       }
-
-       private boolean allClustersInitialized(Application application) {
-               boolean allClustersInitialized = false;
-               for (ClusterDataHolder holder : 
application.getClusterDataRecursively()) {
-                       
TopologyManager.acquireReadLockForCluster(holder.getServiceType(),
-                                                                 
holder.getClusterId());
-
-                       try {
-                               Topology topology = 
TopologyManager.getTopology();
-                               if (topology != null) {
-                                       Service service = 
topology.getService(holder.getServiceType());
-                                       if (service != null) {
-                                               if 
(service.clusterExists(holder.getClusterId())) {
-                                                       allClustersInitialized 
= true;
-                                                       return 
allClustersInitialized;
-                                               } else {
-                                                       if 
(log.isDebugEnabled()) {
-                                                               
log.debug("[Cluster] " + holder.getClusterId() + " is not found in " +
-                                                                         "the 
Topology");
-                                                       }
-                                                       allClustersInitialized 
= false;
-                                               }
-                                       } else {
-                                               if (log.isDebugEnabled()) {
-                                                       log.debug("Service is 
null in the CompleteTopologyEvent");
-                                               }
-                                       }
-                               } else {
-                                       if (log.isDebugEnabled()) {
-                                               log.debug("Topology is null in 
the CompleteTopologyEvent");
-                                       }
-                               }
-                       } finally {
-                               
TopologyManager.releaseReadLockForCluster(holder.getServiceType(),
-                                                                         
holder.getClusterId());
-                       }
-               }
-               return allClustersInitialized;
-       }
-
-       private void addEventListeners() {
-               // Listen to topology events that affect clusters
-               topologyEventReceiver.addEventListener(new 
CompleteTopologyEventListener() {
-                       @Override
-                       protected void onEvent(Event event) {
-                               if (!topologyInitialized) {
-                                       log.info("[CompleteTopologyEvent] 
Received: " + event.getClass());
-                                       ApplicationHolder.acquireReadLock();
-                                       try {
-                                               Applications applications = 
ApplicationHolder.getApplications();
-                                               if (applications != null) {
-                                                       for (Application 
application : applications.getApplications().values()) {
-                                                               if 
(allClustersInitialized(application)) {
-                                                                       
startApplicationMonitor(application.getUniqueIdentifier());
-                                                               } else {
-                                                                       
log.error("Complete Topology is not consistent with the applications " +
-                                                                               
  "which got persisted");
-                                                               }
-                                                       }
-                                                       topologyInitialized = 
true;
-                                               } else {
-                                                       log.info("No 
applications found in the complete topology");
-                                               }
-                                       } catch (Exception e) {
-                                               log.error("Error processing 
event", e);
-                                       } finally {
-                                               
ApplicationHolder.releaseReadLock();
-                                       }
-                               }
-                       }
-               });
-
-               topologyEventReceiver.addEventListener(new 
ApplicationClustersCreatedEventListener() {
-                       @Override
-                       protected void onEvent(Event event) {
-                               try {
-                                       
log.info("[ApplicationClustersCreatedEvent] Received: " + event.getClass());
-                                       ApplicationClustersCreatedEvent 
applicationClustersCreatedEvent =
-                                                       
(ApplicationClustersCreatedEvent) event;
-                                       String appId = 
applicationClustersCreatedEvent.getAppId();
-                                       try {
-                                               //acquire read lock
-                                               
ApplicationHolder.acquireReadLock();
-                                               //start the application monitor
-                                               startApplicationMonitor(appId);
-                                       } catch (Exception e) {
-                                               String msg = "Error processing 
event " + e.getLocalizedMessage();
-                                               log.error(msg, e);
-                                       } finally {
-                                               //release read lock
-                                               
ApplicationHolder.releaseReadLock();
-
-                                       }
-                               } catch (ClassCastException e) {
-                                       String msg = "Error while casting the 
event " + e.getLocalizedMessage();
-                                       log.error(msg, e);
-                               }
-
-                       }
-               });
-
-               topologyEventReceiver.addEventListener(new 
ClusterActivatedEventListener() {
-                       @Override
-                       protected void onEvent(Event event) {
-                               log.info("[ClusterActivatedEvent] Received: " + 
event.getClass());
-                               ClusterActivatedEvent clusterActivatedEvent = 
(ClusterActivatedEvent) event;
-                               String clusterId = 
clusterActivatedEvent.getClusterId();
-                               AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
-                               AbstractClusterMonitor monitor;
-                               monitor = asCtx.getClusterMonitor(clusterId);
-                               if (null == monitor) {
-                                       if (log.isDebugEnabled()) {
-                                               log.debug(String.format("A 
cluster monitor is not found in autoscaler context "
-                                                                       + 
"[cluster] %s", clusterId));
-                                       }
-                                       return;
-                               }
-                               //changing the status in the monitor, will 
notify its parent monitor
-
-                       }
-               });
-
-               topologyEventReceiver.addEventListener(new 
ClusterResetEventListener() {
-                       @Override
-                       protected void onEvent(Event event) {
-                               log.info("[ClusterCreatedEvent] Received: " + 
event.getClass());
-                               ClusterResetEvent clusterResetEvent = 
(ClusterResetEvent) event;
-                               String clusterId = 
clusterResetEvent.getClusterId();
-                               AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
-                               AbstractClusterMonitor monitor;
-                               monitor = asCtx.getClusterMonitor(clusterId);
-                               if (null == monitor) {
-                                       if (log.isDebugEnabled()) {
-                                               log.debug(String.format("A 
cluster monitor is not found in autoscaler context "
-                                                                       + 
"[cluster] %s", clusterId));
-                                       }
-                                       return;
-                               }
-                               //changing the status in the monitor, will 
notify its parent monitor
-                               monitor.destroy();
-                               monitor.setStatus(ClusterStatus.Created);
-
-                       }
-               });
-
-               topologyEventReceiver.addEventListener(new 
ClusterCreatedEventListener() {
-                       @Override
-                       protected void onEvent(Event event) {
-                               log.info("[ClusterCreatedEvent] Received: " + 
event.getClass());
-                       }
-               });
-
-               topologyEventReceiver.addEventListener(new 
ClusterInActivateEventListener() {
-                       @Override
-                       protected void onEvent(Event event) {
-                               log.info("[ClusterInActivateEvent] Received: " 
+ event.getClass());
-                               ClusterInactivateEvent clusterInactivateEvent = 
(ClusterInactivateEvent) event;
-                               String clusterId = 
clusterInactivateEvent.getClusterId();
-                               AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
-                               AbstractClusterMonitor monitor;
-                               monitor = asCtx.getClusterMonitor(clusterId);
-                               if (null == monitor) {
-                                       if (log.isDebugEnabled()) {
-                                               log.debug(String.format("A 
cluster monitor is not found in autoscaler context "
-                                                                       + 
"[cluster] %s", clusterId));
-                                       }
-                                       return;
-                               }
-                               //changing the status in the monitor, will 
notify its parent monitor
-                               monitor.setStatus(ClusterStatus.Inactive);
-                       }
-               });
-
-               topologyEventReceiver.addEventListener(new 
ClusterTerminatingEventListener() {
-                       @Override
-                       protected void onEvent(Event event) {
-                               log.info("[ClusterTerminatingEvent] Received: " 
+ event.getClass());
-                               ClusterTerminatingEvent clusterTerminatingEvent 
= (ClusterTerminatingEvent) event;
-                               String clusterId = 
clusterTerminatingEvent.getClusterId();
-                               String instanceId = 
clusterTerminatingEvent.getInstanceId();
-                               AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
-                               AbstractClusterMonitor monitor;
-                               monitor = asCtx.getClusterMonitor(clusterId);
-                               if (null == monitor) {
-                                       if (log.isDebugEnabled()) {
-                                               log.debug(String.format("A 
cluster monitor is not found in autoscaler context "
-                                                                       + 
"[cluster] %s", clusterId));
-                                       }
-                                       // if monitor does not exist, send 
cluster terminated event
-                                       
ClusterStatusEventPublisher.sendClusterTerminatedEvent(clusterTerminatingEvent.getAppId(),
-                                                                               
               clusterTerminatingEvent.getServiceName(),
-                                                                               
               clusterId, instanceId);
-                                       return;
-                               }
-                               //changing the status in the monitor, will 
notify its parent monitor
-                               if (monitor.getStatus() == 
ClusterStatus.Active) {
-                                       // terminated gracefully
-                                       
monitor.setStatus(ClusterStatus.Terminating);
-                                       
InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId);
-                               } else {
-                                       
monitor.setStatus(ClusterStatus.Terminating);
-                                       monitor.terminateAllMembers();
-                               }
-                               
ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().
-                                               process("", clusterId, 
instanceId);
-                       }
-               });
-
-               topologyEventReceiver.addEventListener(new 
ClusterTerminatedEventListener() {
-                       @Override
-                       protected void onEvent(Event event) {
-                               log.info("[ClusterTerminatedEvent] Received: " 
+ event.getClass());
-                               ClusterTerminatedEvent clusterTerminatedEvent = 
(ClusterTerminatedEvent) event;
-                               String clusterId = 
clusterTerminatedEvent.getClusterId();
-                               AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
-                               AbstractClusterMonitor monitor;
-                               monitor = asCtx.getClusterMonitor(clusterId);
-                               if (null == monitor) {
-                                       if (log.isDebugEnabled()) {
-                                               log.debug(String.format("A 
cluster monitor is not found in autoscaler context "
-                                                                       + 
"[cluster] %s", clusterId));
-                                       }
-                                       // if the cluster monitor is null, 
assume that its termianted
-                                       ApplicationMonitor appMonitor =
-                                                       
AutoscalerContext.getInstance().getAppMonitor(clusterTerminatedEvent.getAppId());
-                                       if (appMonitor != null) {
-                                               appMonitor
-                                                               
.onChildStatusEvent(new ClusterStatusEvent(ClusterStatus.Terminated, clusterId, 
null));
-                                       }
-                                       return;
-                               }
-                               //changing the status in the monitor, will 
notify its parent monitor
-                               monitor.setStatus(ClusterStatus.Terminated);
-                               //Destroying and Removing the Cluster monitor
-                               monitor.destroy();
-                               
AutoscalerContext.getInstance().removeClusterMonitor(clusterId);
-                       }
-               });
-
-               topologyEventReceiver.addEventListener(new 
MemberReadyToShutdownEventListener() {
-                       @Override
-                       protected void onEvent(Event event) {
-                               try {
-                                       MemberReadyToShutdownEvent 
memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event;
-                                       String clusterId = 
memberReadyToShutdownEvent.getClusterId();
-                                       AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
-                                       AbstractClusterMonitor monitor;
-                                       monitor = 
asCtx.getClusterMonitor(clusterId);
-                                       if (null == monitor) {
-                                               if (log.isDebugEnabled()) {
-                                                       
log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                                                                               
+ "[cluster] %s", clusterId));
-                                               }
-                                               return;
-                                       }
-                                       
monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
-                               } catch (Exception e) {
-                                       String msg = "Error processing event " 
+ e.getLocalizedMessage();
-                                       log.error(msg, e);
-                               }
-                       }
-               });
-
-               topologyEventReceiver.addEventListener(new 
MemberStartedEventListener() {
-                       @Override
-                       protected void onEvent(Event event) {
-
-                       }
-               });
-
-               topologyEventReceiver.addEventListener(new 
MemberTerminatedEventListener() {
-                       @Override
-                       protected void onEvent(Event event) {
-                               try {
-                                       MemberTerminatedEvent 
memberTerminatedEvent = (MemberTerminatedEvent) event;
-                                       String clusterId = 
memberTerminatedEvent.getClusterId();
-                                       AbstractClusterMonitor monitor;
-                                       AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
-                                       monitor = 
asCtx.getClusterMonitor(clusterId);
-                                       if (null == monitor) {
-                                               if (log.isDebugEnabled()) {
-                                                       
log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                                                                               
+ "[cluster] %s", clusterId));
-                                               }
-                                               return;
-                                       }
-                                       
monitor.handleMemberTerminatedEvent(memberTerminatedEvent);
-                               } catch (Exception e) {
-                                       String msg = "Error processing event " 
+ e.getLocalizedMessage();
-                                       log.error(msg, e);
-                               }
-                       }
-               });
-
-               topologyEventReceiver.addEventListener(new 
MemberActivatedEventListener() {
-                       @Override
-                       protected void onEvent(Event event) {
-                               try {
-                                       MemberActivatedEvent 
memberActivatedEvent = (MemberActivatedEvent) event;
-                                       String clusterId = 
memberActivatedEvent.getClusterId();
-                                       AbstractClusterMonitor monitor;
-                                       AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
-                                       monitor = 
asCtx.getClusterMonitor(clusterId);
-                                       if (null == monitor) {
-                                               if (log.isDebugEnabled()) {
-                                                       
log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                                                                               
+ "[cluster] %s", clusterId));
-                                               }
-                                               return;
-                                       }
-                                       
monitor.handleMemberActivatedEvent(memberActivatedEvent);
-                               } catch (Exception e) {
-                                       String msg = "Error processing event " 
+ e.getLocalizedMessage();
-                                       log.error(msg, e);
-                               }
-                       }
-               });
-
-               topologyEventReceiver.addEventListener(new 
MemberMaintenanceListener() {
-                       @Override
-                       protected void onEvent(Event event) {
-                               try {
-                                       MemberMaintenanceModeEvent 
maintenanceModeEvent = (MemberMaintenanceModeEvent) event;
-                                       String clusterId = 
maintenanceModeEvent.getClusterId();
-                                       AbstractClusterMonitor monitor;
-                                       AutoscalerContext asCtx = 
AutoscalerContext.getInstance();
-                                       monitor = 
asCtx.getClusterMonitor(clusterId);
-                                       if (null == monitor) {
-                                               if (log.isDebugEnabled()) {
-                                                       
log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                                                                               
+ "[cluster] %s", clusterId));
-                                               }
-                                               return;
-                                       }
-                                       
monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent);
-                               } catch (Exception e) {
-                                       String msg = "Error processing event " 
+ e.getLocalizedMessage();
-                                       log.error(msg, e);
-                               }
-                       }
-               });
-
-               topologyEventReceiver.addEventListener(new 
ClusterInstanceCreatedEventListener() {
-                       @Override
-                       protected void onEvent(Event event) {
-
-                               ClusterInstanceCreatedEvent 
clusterInstanceCreatedEvent =
-                                               (ClusterInstanceCreatedEvent) 
event;
-                               AbstractClusterMonitor clusterMonitor = 
AutoscalerContext.getInstance().
-                                               
getClusterMonitor(clusterInstanceCreatedEvent.getClusterId());
-
-                               if (clusterMonitor != null) {
-                                       
TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
-                                                                               
  clusterInstanceCreatedEvent.getClusterId());
-
-                                       try {
-                                               Service service = 
TopologyManager.getTopology().
-                                                               
getService(clusterInstanceCreatedEvent.getServiceName());
-
-                                               if (service != null) {
-                                                       Cluster cluster = 
service.getCluster(clusterInstanceCreatedEvent.getClusterId());
-                                                       if (cluster != null) {
-                                                               // create and 
add Cluster Context
-                                                               try {
-                                                                       if 
(cluster.isKubernetesCluster()) {
-                                                                               
clusterMonitor.addClusterContextForInstance(
-                                                                               
                clusterInstanceCreatedEvent.getInstanceId(),
-                                                                               
                ClusterContextFactory.getKubernetesClusterContext(cluster));
-                                                                       } else 
if (cluster.isLbCluster()) {
-                                                                               
clusterMonitor.addClusterContextForInstance(
-                                                                               
                clusterInstanceCreatedEvent.getInstanceId(),
-                                                                               
                ClusterContextFactory.getVMLBClusterContext(cluster));
-                                                                       } else {
-                                                                               
clusterMonitor.addClusterContextForInstance(
-                                                                               
                clusterInstanceCreatedEvent.getInstanceId(),
-                                                                               
                ClusterContextFactory.getVMServiceClusterContext(cluster));
-                                                                       }
-
-                                                                       if 
(clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) {
-                                                                               
clusterMonitor.startScheduler();
-                                                                               
log.info("Monitoring task for Cluster Monitor with cluster id " +
-                                                                               
         clusterInstanceCreatedEvent.getClusterId() + " started successfully");
-                                                                       }
-
-                                                               } catch 
(PolicyValidationException e) {
-                                                                       
log.error(e.getMessage(), e);
-                                                               } catch 
(PartitionValidationException e) {
-                                                                       
log.error(e.getMessage(), e);
-                                                               }
-
-                                                       } else {
-                                                               
log.error("Cluster not found for " + clusterInstanceCreatedEvent.getClusterId() 
+
-                                                                         ", no 
cluster instance added to ClusterMonitor " +
-                                                                         
clusterInstanceCreatedEvent.getClusterId());
-                                                       }
-                                               } else {
-                                                       log.error("Service " + 
clusterInstanceCreatedEvent.getServiceName() +
-                                                                 " not found, 
no cluster instance added to ClusterMonitor " +
-                                                                 
clusterInstanceCreatedEvent.getClusterId());
-                                               }
-
-                                       } finally {
-                                               
TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
-                                                                               
          clusterInstanceCreatedEvent.getClusterId());
-                                       }
-
-                               } else {
-                                       log.error("No Cluster Monitor found for 
cluster id " +
-                                                 
clusterInstanceCreatedEvent.getClusterId());
-                               }
-                       }
-               });
-       }
-
-       /**
-        * Terminate load balancer topology receiver thread.
-        */
-       public void terminate() {
-               topologyEventReceiver.terminate();
-               terminated = true;
-       }
-
-       protected synchronized void startApplicationMonitor(String 
applicationId) {
-               Thread th = null;
-               if 
(AutoscalerContext.getInstance().getAppMonitor(applicationId) == null) {
-                       th = new Thread(new 
ApplicationMonitorAdder(applicationId));
-               }
-               if (th != null) {
-                       th.start();
-               } else {
-                       if (log.isDebugEnabled()) {
-                               log.debug(String
-                                                         .format("Application 
monitor thread already exists: " +
-                                                                 
"[application] %s ", applicationId));
-                       }
-               }
-       }
->>>>>>> ddf277b... Remove unnessary threads in messaging model
 
        public ExecutorService getExecutorService() {
                return executorService;
@@ -987,8 +523,6 @@ public class AutoscalerTopologyEventReceiver {
        public void setExecutorService(ExecutorService executorService) {
                this.executorService = executorService;
        }
-<<<<<<< HEAD
-=======
 
        private class ApplicationMonitorAdder implements Runnable {
                private String appId;
@@ -1047,5 +581,5 @@ public class AutoscalerTopologyEventReceiver {
                        }
                }
        }
->>>>>>> ddf277b... Remove unnessary threads in messaging model
+
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index bb5e167..91d52d3 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -39,6 +39,7 @@ import 
org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
 import org.apache.stratos.cloud.controller.stub.domain.Partition;
 import org.apache.stratos.common.kubernetes.KubernetesGroup;
 import org.apache.stratos.common.threading.StratosThreadPool;
+import org.drools.reteoo.PartitionManager;
 import org.osgi.service.component.ComponentContext;
 import org.wso2.carbon.ntask.core.service.TaskService;
 import org.wso2.carbon.registry.api.RegistryException;
@@ -75,28 +76,27 @@ public class AutoscalerServerComponent {
 
 
        protected void activate(ComponentContext componentContext) throws 
Exception {
-<<<<<<< HEAD
-        try {
-            // Start topology receiver
-               XMLConfiguration conf = 
ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
-               int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY, 
THREAD_POOL_SIZE);
-               String threadIdentifier=conf.getString(THREAD_IDENTIFIER_KEY, 
DEFAULT_IDENTIFIER);
-               ExecutorService executorService = 
StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize);
-            asTopologyReceiver = new AutoscalerTopologyEventReceiver();
-               asTopologyReceiver.setExecutorService(executorService);
-               asTopologyReceiver.execute();
+               try {
+                       // Start topology receiver
+                       XMLConfiguration conf = 
ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
+                       int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY, 
THREAD_POOL_SIZE);
+                       String threadIdentifier = 
conf.getString(THREAD_IDENTIFIER_KEY, DEFAULT_IDENTIFIER);
+                       ExecutorService executorService = 
StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize);
+                       asTopologyReceiver = new 
AutoscalerTopologyEventReceiver();
+                       asTopologyReceiver.setExecutorService(executorService);
+                       asTopologyReceiver.execute();
 
-            if (log.isDebugEnabled()) {
-                log.debug("Topology receiver executor service started");
-            }
+                       if (log.isDebugEnabled()) {
+                               log.debug("Topology receiver executor service 
started");
+                       }
 
-            // Start health stat receiver
-            autoscalerHealthStatEventReceiver = new 
AutoscalerHealthStatEventReceiver();
-            Thread healthDelegatorThread = new 
Thread(autoscalerHealthStatEventReceiver);
-            healthDelegatorThread.start();
-            if (log.isDebugEnabled()) {
-                log.debug("Health statistics receiver thread started");
-            }
+                       // Start health stat receiver
+                       autoscalerHealthStatEventReceiver = new 
AutoscalerHealthStatEventReceiver();
+                       
autoscalerHealthStatEventReceiver.setExecutorService(executorService);
+                       autoscalerHealthStatEventReceiver.execute();
+                       if (log.isDebugEnabled()) {
+                               log.debug("Health statistics receiver thread 
started");
+                       }
 
             // Adding the registry stored partitions to the information model
             List<Partition> partitions = 
RegistryManager.getInstance().retrievePartitions();
@@ -105,7 +105,7 @@ public class AutoscalerServerComponent {
                 Partition partition = partitionIterator.next();
 //                
PartitionManager.getInstance().addPartitionToInformationModel(partition);
             }
-            
+
             // Adding the network partitions stored in registry to the 
information model
 //            List<NetworkPartitionLbHolder> nwPartitionHolders = 
RegistryManager.getInstance().retrieveNetworkPartitionLbHolders();
 //            Iterator<NetworkPartitionLbHolder> nwPartitionIterator = 
nwPartitionHolders.iterator();
@@ -113,7 +113,7 @@ public class AutoscalerServerComponent {
 //                NetworkPartitionLbHolder nwPartition = 
nwPartitionIterator.next();
 //                
PartitionManager.getInstance().addNetworkPartitionLbHolder(nwPartition);
 //            }
-            
+
             List<AutoscalePolicy> asPolicies = 
RegistryManager.getInstance().retrieveASPolicies();
             Iterator<AutoscalePolicy> asPolicyIterator = asPolicies.iterator();
             while (asPolicyIterator.hasNext()) {
@@ -121,43 +121,43 @@ public class AutoscalerServerComponent {
                 
PolicyManager.getInstance().addASPolicyToInformationModel(asPolicy);
             }
 
-            List<DeploymentPolicy> depPolicies = 
RegistryManager.getInstance().retrieveDeploymentPolicies();
-            Iterator<DeploymentPolicy> depPolicyIterator = 
depPolicies.iterator();
-            while (depPolicyIterator.hasNext()) {
-                DeploymentPolicy depPolicy = depPolicyIterator.next();
-                
PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy);
-            }
+                       List<DeploymentPolicy> depPolicies = 
RegistryManager.getInstance().retrieveDeploymentPolicies();
+                       Iterator<DeploymentPolicy> depPolicyIterator = 
depPolicies.iterator();
+                       while (depPolicyIterator.hasNext()) {
+                               DeploymentPolicy depPolicy = 
depPolicyIterator.next();
+                               
PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy);
+                       }
 
-            // Adding KubernetesGroups stored in registry to the information 
model
-            List<KubernetesGroup> kubernetesGroupList = 
RegistryManager.getInstance().retrieveKubernetesGroups();
-            Iterator<KubernetesGroup> kubernetesGroupIterator = 
kubernetesGroupList.iterator();
-            while (kubernetesGroupIterator.hasNext()) {
-                KubernetesGroup kubernetesGroup = 
kubernetesGroupIterator.next();
-                
KubernetesManager.getInstance().addNewKubernetesGroup(kubernetesGroup);
-            }
+                       // Adding KubernetesGroups stored in registry to the 
information model
+                       List<KubernetesGroup> kubernetesGroupList = 
RegistryManager.getInstance().retrieveKubernetesGroups();
+                       Iterator<KubernetesGroup> kubernetesGroupIterator = 
kubernetesGroupList.iterator();
+                       while (kubernetesGroupIterator.hasNext()) {
+                               KubernetesGroup kubernetesGroup = 
kubernetesGroupIterator.next();
+                               
KubernetesManager.getInstance().addNewKubernetesGroup(kubernetesGroup);
+                       }
 
-            //starting the processor chain
-            ClusterStatusProcessorChain clusterStatusProcessorChain = new 
ClusterStatusProcessorChain();
-            
ServiceReferenceHolder.getInstance().setClusterStatusProcessorChain(clusterStatusProcessorChain);
+                       //starting the processor chain
+                       ClusterStatusProcessorChain clusterStatusProcessorChain 
= new ClusterStatusProcessorChain();
+                       
ServiceReferenceHolder.getInstance().setClusterStatusProcessorChain(clusterStatusProcessorChain);
 
-            GroupStatusProcessorChain groupStatusProcessorChain = new 
GroupStatusProcessorChain();
-            
ServiceReferenceHolder.getInstance().setGroupStatusProcessorChain(groupStatusProcessorChain);
+                       GroupStatusProcessorChain groupStatusProcessorChain = 
new GroupStatusProcessorChain();
+                       
ServiceReferenceHolder.getInstance().setGroupStatusProcessorChain(groupStatusProcessorChain);
 
-            if (log.isInfoEnabled()) {
-                log.info("Scheduling tasks to publish applications");
-            }
+                       if (log.isInfoEnabled()) {
+                               log.info("Scheduling tasks to publish 
applications");
+                       }
 
-            ApplicationSynchronizerTaskScheduler
-                    .schedule(ServiceReferenceHolder.getInstance()
-                            .getTaskService());
+                       ApplicationSynchronizerTaskScheduler
+                                       
.schedule(ServiceReferenceHolder.getInstance()
+                                                                       
.getTaskService());
 
-            if (log.isInfoEnabled()) {
-                log.info("Autoscaler server Component activated");
-            }
-        } catch (Throwable e) {
-            log.error("Error in activating the autoscaler component ", e);
-        }
-    }
+                       if (log.isInfoEnabled()) {
+                               log.info("Autoscaler server Component 
activated");
+                       }
+               } catch (Throwable e) {
+                       log.error("Error in activating the autoscaler component 
", e);
+               }
+       }
 
     protected void deactivate(ComponentContext context) {
         asTopologyReceiver.terminate();
@@ -198,128 +198,4 @@ public class AutoscalerServerComponent {
         ServiceReferenceHolder.getInstance().setTaskService(null);
     }
 }
-=======
-               try {
-                       // Start topology receiver
-                       XMLConfiguration conf = 
ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
-                       int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY, 
THREAD_POOL_SIZE);
-                       String threadIdentifier = 
conf.getString(THREAD_IDENTIFIER_KEY, DEFAULT_IDENTIFIER);
-                       ExecutorService executorService = 
StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize);
-                       asTopologyReceiver = new 
AutoscalerTopologyEventReceiver();
-                       asTopologyReceiver.setExecutorService(executorService);
-                       asTopologyReceiver.execute();
-
-                       if (log.isDebugEnabled()) {
-                               log.debug("Topology receiver executor service 
started");
-                       }
-
-                       // Start health stat receiver
-                       autoscalerHealthStatEventReceiver = new 
AutoscalerHealthStatEventReceiver();
-                       
autoscalerHealthStatEventReceiver.setExecutorService(executorService);
-                       autoscalerHealthStatEventReceiver.execute();
-                       if (log.isDebugEnabled()) {
-                               log.debug("Health statistics receiver thread 
started");
-                       }
-
-                       // Adding the registry stored partitions to the 
information model
-                       List<Partition> partitions = 
RegistryManager.getInstance().retrievePartitions();
-                       Iterator<Partition> partitionIterator = 
partitions.iterator();
-                       while (partitionIterator.hasNext()) {
-                               Partition partition = partitionIterator.next();
-                               
PartitionManager.getInstance().addPartitionToInformationModel(partition);
-                       }
-
-                       // Adding the network partitions stored in registry to 
the information model
-                       List<NetworkPartitionLbHolder> nwPartitionHolders =
-                                       
RegistryManager.getInstance().retrieveNetworkPartitionLbHolders();
-                       Iterator<NetworkPartitionLbHolder> nwPartitionIterator 
= nwPartitionHolders.iterator();
-                       while (nwPartitionIterator.hasNext()) {
-                               NetworkPartitionLbHolder nwPartition = 
nwPartitionIterator.next();
-                               
PartitionManager.getInstance().addNetworkPartitionLbHolder(nwPartition);
-                       }
-
-                       List<AutoscalePolicy> asPolicies = 
RegistryManager.getInstance().retrieveASPolicies();
-                       Iterator<AutoscalePolicy> asPolicyIterator = 
asPolicies.iterator();
-                       while (asPolicyIterator.hasNext()) {
-                               AutoscalePolicy asPolicy = 
asPolicyIterator.next();
-                               
PolicyManager.getInstance().addASPolicyToInformationModel(asPolicy);
-                       }
-
-                       List<DeploymentPolicy> depPolicies = 
RegistryManager.getInstance().retrieveDeploymentPolicies();
-                       Iterator<DeploymentPolicy> depPolicyIterator = 
depPolicies.iterator();
-                       while (depPolicyIterator.hasNext()) {
-                               DeploymentPolicy depPolicy = 
depPolicyIterator.next();
-                               
PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy);
-                       }
-
-                       // Adding KubernetesGroups stored in registry to the 
information model
-                       List<KubernetesGroup> kubernetesGroupList = 
RegistryManager.getInstance().retrieveKubernetesGroups();
-                       Iterator<KubernetesGroup> kubernetesGroupIterator = 
kubernetesGroupList.iterator();
-                       while (kubernetesGroupIterator.hasNext()) {
-                               KubernetesGroup kubernetesGroup = 
kubernetesGroupIterator.next();
-                               
KubernetesManager.getInstance().addNewKubernetesGroup(kubernetesGroup);
-                       }
-
-                       //starting the processor chain
-                       ClusterStatusProcessorChain clusterStatusProcessorChain 
= new ClusterStatusProcessorChain();
-                       
ServiceReferenceHolder.getInstance().setClusterStatusProcessorChain(clusterStatusProcessorChain);
 
-                       GroupStatusProcessorChain groupStatusProcessorChain = 
new GroupStatusProcessorChain();
-                       
ServiceReferenceHolder.getInstance().setGroupStatusProcessorChain(groupStatusProcessorChain);
-
-                       if (log.isInfoEnabled()) {
-                               log.info("Scheduling tasks to publish 
applications");
-                       }
-
-                       ApplicationSynchronizerTaskScheduler
-                                       
.schedule(ServiceReferenceHolder.getInstance()
-                                                                       
.getTaskService());
-
-                       if (log.isInfoEnabled()) {
-                               log.info("Autoscaler server Component 
activated");
-                       }
-               } catch (Throwable e) {
-                       log.error("Error in activating the autoscaler component 
", e);
-               }
-       }
-
-       protected void deactivate(ComponentContext context) {
-               asTopologyReceiver.terminate();
-               autoscalerHealthStatEventReceiver.terminate();
-       }
-
-       protected void setRegistryService(RegistryService registryService) {
-               if (log.isDebugEnabled()) {
-                       log.debug("Setting the Registry Service");
-               }
-               try {
-                       
ServiceReferenceHolder.getInstance().setRegistry(registryService.getGovernanceSystemRegistry());
-               } catch (RegistryException e) {
-                       String msg = "Failed when retrieving Governance System 
Registry.";
-                       log.error(msg, e);
-                       throw new AutoScalerException(msg, e);
-               }
-       }
-
-       protected void unsetRegistryService(RegistryService registryService) {
-               if (log.isDebugEnabled()) {
-                       log.debug("Un-setting the Registry Service");
-               }
-               ServiceReferenceHolder.getInstance().setRegistry(null);
-       }
-
-       protected void setTaskService(TaskService taskService) {
-               if (log.isDebugEnabled()) {
-                       log.debug("Setting the Task Service");
-               }
-               
ServiceReferenceHolder.getInstance().setTaskService(taskService);
-       }
-
-       protected void unsetTaskService(TaskService taskService) {
-               if (log.isDebugEnabled()) {
-                       log.debug("Un-setting the Task Service");
-               }
-               ServiceReferenceHolder.getInstance().setTaskService(null);
-       }
-}
->>>>>>> ddf277b... Remove unnessary threads in messaging model

http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index a413218..700efb9 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -20,41 +20,23 @@ package org.apache.stratos.cloud.controller.internal;
  *
 */
 
-<<<<<<< HEAD
-<<<<<<< HEAD
+import org.apache.commons.configuration.XMLConfiguration;
 
-<<<<<<< HEAD
 import com.hazelcast.core.HazelcastInstance;
 
-=======
->>>>>>> ddf277b... Remove unnessary threads in messaging model
-=======
-
->>>>>>> ad3e45c... Remove unnessary threads in messaging model
-=======
-import org.apache.commons.configuration.XMLConfiguration;
->>>>>>> 1b26a96... Adding executor service for threads and remove unnecessary 
threads
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.context.CloudControllerContext;
 import 
org.apache.stratos.cloud.controller.messaging.receiver.application.ApplicationTopicReceiver;
 import 
org.apache.stratos.cloud.controller.messaging.receiver.cluster.status.ClusterStatusTopicReceiver;
 import org.apache.stratos.cloud.controller.exception.CloudControllerException;
-<<<<<<< HEAD
 import org.apache.stratos.cloud.controller.services.CloudControllerService;
 import 
org.apache.stratos.cloud.controller.services.impl.CloudControllerServiceImpl;
 import 
org.apache.stratos.cloud.controller.messaging.publisher.TopologySynchronizerTaskScheduler;
 import 
org.apache.stratos.cloud.controller.messaging.receiver.instance.status.InstanceStatusTopicReceiver;
 import org.apache.stratos.common.clustering.DistributedObjectProvider;
-=======
-import org.apache.stratos.cloud.controller.impl.CloudControllerServiceImpl;
-import org.apache.stratos.cloud.controller.interfaces.CloudControllerService;
-import 
org.apache.stratos.cloud.controller.publisher.TopologySynchronizerTaskScheduler;
-import 
org.apache.stratos.cloud.controller.receiver.instance.status.InstanceStatusTopicReceiver;
-import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder;
 import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.common.util.ConfUtil;
->>>>>>> 1b26a96... Adding executor service for threads and remove unnecessary 
threads
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.util.Util;
 import org.osgi.framework.BundleContext;
@@ -72,7 +54,6 @@ import java.util.concurrent.ExecutorService;
  * Registering Cloud Controller Service.
  *
  * @scr.component name="org.apache.stratos.cloud.controller" immediate="true"
-<<<<<<< HEAD
  * @scr.reference name="distributedObjectProvider" 
interface="org.apache.stratos.common.clustering.DistributedObjectProvider"
  *                cardinality="1..1" policy="dynamic" 
bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider"
  * @scr.reference name="ntask.component" 
interface="org.wso2.carbon.ntask.core.service.TaskService"
@@ -81,24 +62,6 @@ import java.util.concurrent.ExecutorService;
  *                cardinality="1..1" policy="dynamic" 
bind="setRegistryService" unbind="unsetRegistryService"
  * @scr.reference name="config.context.service" 
interface="org.wso2.carbon.utils.ConfigurationContextService"
  *                cardinality="1..1" policy="dynamic" 
bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
-=======
- * @scr.reference name="distributedMapProvider" 
interface="org.wso2.carbon.caching.impl.DistributedMapProvider"
- *                cardinality="1..1" policy="dynamic" 
bind="setDistributedMapProvider" unbind="unsetDistributedMapProvider"
- * @scr.reference name="ntask.component"
- *                interface="org.wso2.carbon.ntask.core.service.TaskService"
- *                cardinality="1..1" policy="dynamic" bind="setTaskService" 
unbind="unsetTaskService"
- * @scr.reference name="registry.service"
- *                
interface="org.wso2.carbon.registry.core.service.RegistryService"
- *                cardinality="1..1" policy="dynamic" 
bind="setRegistryService" unbind="unsetRegistryService"
- * @scr.reference name="config.context.service"
-<<<<<<< HEAD
- * interface="org.wso2.carbon.utils.ConfigurationContextService"
- * cardinality="1..1" policy="dynamic" bind="setConfigurationContextService" 
unbind="unsetConfigurationContextService"
->>>>>>> ddf277b... Remove unnessary threads in messaging model
-=======
- *                interface="org.wso2.carbon.utils.ConfigurationContextService"
- *                cardinality="1..1" policy="dynamic" 
bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
->>>>>>> ad3e45c... Remove unnessary threads in messaging model
  */
 public class CloudControllerServiceComponent {
 
@@ -107,8 +70,8 @@ public class CloudControllerServiceComponent {
        private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
        private ApplicationTopicReceiver applicationTopicReceiver;
        private static final String THREAD_IDENTIFIER_KEY = 
"threadPool.autoscaler.identifier";
-       private static final String DEFAULT_IDENTIFIER = "Auto-Scaler";
-       private static final String THREAD_POOL_SIZE_KEY = 
"threadPool.autoscaler.threadPoolSize";
+       private static final String DEFAULT_IDENTIFIER = "Cloud-Controller";
+       private static final String THREAD_POOL_SIZE_KEY = 
"threadPool.cloudcontroller.threadPoolSize";
        private static final String COMPONENTS_CONFIG = "stratos-config";
        private static final int THREAD_POOL_SIZE = 10;
 
@@ -127,28 +90,16 @@ public class CloudControllerServiceComponent {
                                log.info("Application Receiver thread started");
                        }
 
-<<<<<<< HEAD
-            if (log.isInfoEnabled()) {
-                log.info("Application event receiver thread started");
-            }
-=======
                        clusterStatusTopicReceiver = new 
ClusterStatusTopicReceiver();
                        
clusterStatusTopicReceiver.setExecutorService(executorService);
                        clusterStatusTopicReceiver.execute();
->>>>>>> ddf277b... Remove unnessary threads in messaging model
 
                        if (log.isInfoEnabled()) {
                                log.info("Cluster status Receiver thread 
started");
                        }
 
-<<<<<<< HEAD
-            if (log.isInfoEnabled()) {
-                log.info("Cluster status receiver thread started");
-            }
-=======
                        instanceStatusTopicReceiver = new 
InstanceStatusTopicReceiver();
                        instanceStatusTopicReceiver.execute();
->>>>>>> ddf277b... Remove unnessary threads in messaging model
 
                        if (log.isInfoEnabled()) {
                                log.info("Instance status message receiver 
thread started");
@@ -163,23 +114,15 @@ public class CloudControllerServiceComponent {
                                log.info("Scheduling tasks");
                        }
 
-<<<<<<< HEAD
-            if(log.isInfoEnabled()) {
-                log.info("Scheduling tasks");
-            }
+                       TopologySynchronizerTaskScheduler
+                                       
.schedule(ServiceReferenceHolder.getInstance()
+                                                                       
.getTaskService());
 
-            if ((!CloudControllerContext.getInstance().isClustered()) ||
-                    (CloudControllerContext.getInstance().isCoordinator())) {
-                
TopologySynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance().getTaskService());
-                if(log.isInfoEnabled()) {
-                    log.info("Topology synchronizer task scheduled");
-                }
-            }
-        } catch (Throwable e) {
-            log.error("**** Cloud controller service bundle is failed to 
activate ****", e);
+               } catch (Throwable e) {
+                       log.error("******* Cloud Controller Service bundle is 
failed to activate ****", e);
         }
     }
-    
+
     protected void setTaskService(TaskService taskService) {
         if (log.isDebugEnabled()) {
             log.debug("Setting the Task Service");
@@ -193,32 +136,7 @@ public class CloudControllerServiceComponent {
         }
         ServiceReferenceHolder.getInstance().setTaskService(null);
     }
-    
-=======
-                       TopologySynchronizerTaskScheduler
-                                       
.schedule(ServiceReferenceHolder.getInstance()
-                                                                       
.getTaskService());
-
-               } catch (Throwable e) {
-                       log.error("******* Cloud Controller Service bundle is 
failed to activate ****", e);
-               }
-       }
-
-       protected void setTaskService(TaskService taskService) {
-               if (log.isDebugEnabled()) {
-                       log.debug("Setting the Task Service");
-               }
-               
ServiceReferenceHolder.getInstance().setTaskService(taskService);
-       }
 
-       protected void unsetTaskService(TaskService taskService) {
-               if (log.isDebugEnabled()) {
-                       log.debug("Unsetting the Task Service");
-               }
-               ServiceReferenceHolder.getInstance().setTaskService(null);
-       }
-
->>>>>>> ddf277b... Remove unnessary threads in messaging model
        protected void setRegistryService(RegistryService registryService) {
                if (log.isDebugEnabled()) {
                        log.debug("Setting the Registry Service");
@@ -226,39 +144,22 @@ public class CloudControllerServiceComponent {
 
                try {
                        UserRegistry registry = 
registryService.getGovernanceSystemRegistry();
-<<<<<<< HEAD
                ServiceReferenceHolder.getInstance().setRegistry(registry);
         } catch (RegistryException e) {
                String msg = "Failed when retrieving Governance System 
Registry.";
                log.error(msg, e);
                throw new CloudControllerException(msg, e);
-        } 
-=======
-                       ServiceReferenceHolder.getInstance()
-                                             .setRegistry(registry);
-               } catch (RegistryException e) {
-                       String msg = "Failed when retrieving Governance System 
Registry.";
-                       log.error(msg, e);
-                       throw new CloudControllerException(msg, e);
-               }
->>>>>>> ddf277b... Remove unnessary threads in messaging model
+        }
        }
 
        protected void unsetRegistryService(RegistryService registryService) {
                if (log.isDebugEnabled()) {
-<<<<<<< HEAD
             log.debug("Un-setting the Registry Service");
         }
         ServiceReferenceHolder.getInstance().setRegistry(null);
-=======
-                       log.debug("Unsetting the Registry Service");
-               }
-               ServiceReferenceHolder.getInstance().setRegistry(null);
->>>>>>> ddf277b... Remove unnessary threads in messaging model
        }
 
        protected void 
setConfigurationContextService(ConfigurationContextService cfgCtxService) {
-<<<<<<< HEAD
         ServiceReferenceHolder.getInstance().setAxisConfiguration(
                 cfgCtxService.getServerConfigContext().getAxisConfiguration());
     }
@@ -274,27 +175,9 @@ public class CloudControllerServiceComponent {
     protected void unsetDistributedObjectProvider(DistributedObjectProvider 
distributedObjectProvider) {
         
ServiceReferenceHolder.getInstance().setDistributedObjectProvider(null);
     }
-       
-=======
-               ServiceReferenceHolder.getInstance().setAxisConfiguration(
-                               
cfgCtxService.getServerConfigContext().getAxisConfiguration());
-       }
-
-       protected void 
unsetConfigurationContextService(ConfigurationContextService cfgCtxService) {
-               ServiceReferenceHolder.getInstance().setAxisConfiguration(null);
-       }
-
-       protected void setDistributedMapProvider(DistributedMapProvider 
mapProvider) {
-               
ServiceReferenceHolder.getInstance().setDistributedMapProvider(mapProvider);
-       }
-
-       protected void unsetDistributedMapProvider(DistributedMapProvider 
mapProvider) {
-               
ServiceReferenceHolder.getInstance().setDistributedMapProvider(null);
-       }
 
->>>>>>> ddf277b... Remove unnessary threads in messaging model
        protected void deactivate(ComponentContext ctx) {
-               // Close event publisher connections to message broker
-               
EventPublisherPool.close(Util.Topics.TOPOLOGY_TOPIC.getTopicName());
+        // Close event publisher connections to message broker
+        EventPublisherPool.close(Util.Topics.TOPOLOGY_TOPIC.getTopicName());
        }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index 3aa77a8..509bc74 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -120,30 +120,13 @@ public class LoadBalancerServiceComponent {
             TopologyFilterConfigurator.configure(configuration);
 
             if (configuration.isMultiTenancyEnabled()) {
-<<<<<<< HEAD
                 // Start tenant event receiver
                 startTenantEventReceiver();
-=======
-
-                tenantReceiver = new LoadBalancerTenantEventReceiver();
-                               tenantReceiver.execute();
-
-                if (log.isInfoEnabled()) {
-                    log.info("Tenant receiver thread started");
-                }
->>>>>>> ae876c1... Remove unnessary threads in messaging model
             }
 
             if (configuration.isTopologyEventListenerEnabled()) {
                 // Start topology receiver
-<<<<<<< HEAD
                 startTopologyEventReceiver();
-=======
-                topologyReceiver = new LoadBalancerTopologyEventReceiver();
-                topologyReceiver.execute();
-                if (log.isInfoEnabled()) {
-                    log.info("Topology receiver thread started");
-                }
 
                 if (log.isInfoEnabled()) {
                     if (TopologyServiceFilter.getInstance().isActive()) {
@@ -177,7 +160,7 @@ public class LoadBalancerServiceComponent {
                         log.info(String.format("Member filter activated: 
[lb-cluster-ids] %s", sb.toString()));
                     }
                 }
->>>>>>> ae876c1... Remove unnessary threads in messaging model
+
             }
 
             if(configuration.isCepStatsPublisherEnabled()) {
@@ -197,18 +180,16 @@ public class LoadBalancerServiceComponent {
     }
 
     private void startTenantEventReceiver() {
-        tenantReceiver = new LoadBalancerTenantEventReceiver();
-        Thread tenantReceiverThread = new Thread(tenantReceiver);
-        tenantReceiverThread.start();
+           tenantReceiver = new LoadBalancerTenantEventReceiver();
+           tenantReceiver.execute();
         if (log.isInfoEnabled()) {
             log.info("Tenant receiver thread started");
         }
     }
 
     private void startTopologyEventReceiver() {
-        topologyReceiver = new LoadBalancerTopologyEventReceiver();
-        Thread topologyReceiverThread = new Thread(topologyReceiver);
-        topologyReceiverThread.start();
+           topologyReceiver = new LoadBalancerTopologyEventReceiver();
+           topologyReceiver.execute();
         if (log.isInfoEnabled()) {
             log.info("Topology receiver thread started");
         }

http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 5ee28d1..8bfcb2c 100644
--- 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -107,7 +107,6 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
                        log.debug("Member not found in the toplogy. Event 
rejected");
                        return;
                }
-<<<<<<< HEAD
         if (StringUtils.isNotEmpty(id)) {
             memberTimeStampMap.put(id, event.getTimeStamp());
         } else {
@@ -320,220 +319,4 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
     public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
         return memberTimeStampMap;
     }
-=======
-               if (StringUtils.isNotEmpty(id)) {
-                       memberTimeStampMap.put(id, event.getTimeStamp());
-               } else {
-                       log.warn("NULL member id found in the event received. 
Event rejected.");
-               }
-               if (log.isDebugEnabled()) {
-                       log.debug("Event received from [member-id] " + id + " 
[time-stamp] " + event.getTimeStamp());
-               }
-       }
-
-       @Override
-       public Iterator<StreamEvent> iterator() {
-               return window.iterator();
-       }
-
-       @Override
-       public Iterator<StreamEvent> iterator(String predicate) {
-               if (siddhiContext.isDistributedProcessingEnabled()) {
-                       return ((SchedulerSiddhiQueueGrid<StreamEvent>) 
window).iterator(predicate);
-               } else {
-                       return window.iterator();
-               }
-       }
-
-       /**
-        * Retrieve the current activated members from the topology and 
initialize the time stamp map.
-        * This will allow the system to recover from a restart
-        *
-        * @param topology Topology model object
-        */
-       boolean loadTimeStampMapFromTopology(Topology topology) {
-
-               long currentTimeStamp = System.currentTimeMillis();
-               if (topology == null || topology.getServices() == null) {
-                       return false;
-               }
-               // TODO make this efficient by adding APIs to messaging 
component
-               for (Service service : topology.getServices()) {
-                       if (service.getClusters() != null) {
-                               for (Cluster cluster : service.getClusters()) {
-                                       if (cluster.getMembers() != null) {
-                                               for (Member member : 
cluster.getMembers()) {
-                                                       // we are checking 
faulty status only in previously activated members
-                                                       if (member != null && 
MemberStatus.Activated.equals(member.getStatus())) {
-                                                               // Initialize 
the member time stamp map from the topology at the beginning
-                                                               
memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
-               }
-
-               log.info("Member time stamp map was successfully loaded from 
the topology.");
-               if (log.isDebugEnabled()) {
-                       log.debug("Member TimeStamp Map: " + 
memberTimeStampMap);
-               }
-               return true;
-       }
-
-       private Member getMemberFromId(String memberId) {
-               if (StringUtils.isEmpty(memberId)) {
-                       return null;
-               }
-               if (TopologyManager.getTopology().isInitialized()) {
-                       try {
-                               TopologyManager.acquireReadLock();
-                               if (TopologyManager.getTopology().getServices() 
== null) {
-                                       return null;
-                               }
-                               // TODO make this efficient by adding APIs to 
messaging component
-                               for (Service service : 
TopologyManager.getTopology().getServices()) {
-                                       if (service.getClusters() != null) {
-                                               for (Cluster cluster : 
service.getClusters()) {
-                                                       if 
(cluster.getMembers() != null) {
-                                                               for (Member 
member : cluster.getMembers()) {
-                                                                       if 
(memberId.equals(member.getMemberId())) {
-                                                                               
return member;
-                                                                       }
-                                                               }
-                                                       }
-                                               }
-                                       }
-                               }
-                       } catch (Exception e) {
-                               log.error("Error while reading topology" + e);
-                       } finally {
-                               TopologyManager.releaseReadLock();
-                       }
-               }
-               return null;
-       }
-
-       private void publishMemberFault(String memberId) {
-               Member member = getMemberFromId(memberId);
-               if (member == null) {
-                       log.error("Failed to publish member fault event. Member 
having [member-id] " + memberId +
-                                 " does not exist in topology");
-                       return;
-               }
-               log.info("Publishing member fault event for [member-id] " + 
memberId);
-
-               MemberFaultEvent memberFaultEvent =
-                               new MemberFaultEvent(member.getClusterId(), 
member.getInstanceId(), member.getMemberId(),
-                                                    member.getPartitionId(), 
0);
-
-               memberFaultEventMessageMap.put("message", memberFaultEvent);
-               healthStatPublisher.publish(MemberFaultEventMap, true);
-       }
-
-       @Override
-       public void run() {
-               try {
-                       threadBarrier.pass();
-
-                       for (Object o : memberTimeStampMap.entrySet()) {
-                               Map.Entry pair = (Map.Entry) o;
-                               long currentTime = System.currentTimeMillis();
-                               Long eventTimeStamp = (Long) pair.getValue();
-
-                               if ((currentTime - eventTimeStamp) > TIME_OUT) {
-                                       log.info("Faulty member detected 
[member-id] " + pair.getKey() + " with [last time-stamp] " +
-                                                eventTimeStamp + " [time-out] 
" + TIME_OUT + " milliseconds");
-                                       publishMemberFault((String) 
pair.getKey());
-                               }
-                       }
-                       if (log.isDebugEnabled()) {
-                               log.debug("Fault handling processor iteration 
completed with [time-stamp map length] " +
-                                         memberTimeStampMap.size() + " 
[time-stamp map] " + memberTimeStampMap);
-                       }
-               } catch (Throwable t) {
-                       log.error(t.getMessage(), t);
-               } finally {
-                       faultHandleScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
-               }
-       }
-
-       @Override
-       protected Object[] currentState() {
-               return new Object[] { window.currentState() };
-       }
-
-       @Override
-       protected void restoreState(Object[] data) {
-               window.restoreState(data);
-               window.restoreState((Object[]) data[0]);
-               window.reSchedule();
-       }
-
-       @Override
-       protected void init(Expression[] parameters, QueryPostProcessingElement 
nextProcessor,
-                           AbstractDefinition streamDefinition, String 
elementId, boolean async,
-                           SiddhiContext siddhiContext) {
-
-               if (parameters[0] instanceof IntConstant) {
-                       timeToKeep = ((IntConstant) parameters[0]).getValue();
-               } else {
-                       timeToKeep = ((LongConstant) parameters[0]).getValue();
-               }
-
-               String memberIdAttrName = ((Variable) 
parameters[1]).getAttributeName();
-               memberIdAttrIndex = 
streamDefinition.getAttributePosition(memberIdAttrName);
-
-               if (this.siddhiContext.isDistributedProcessingEnabled()) {
-                       window = new 
SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, 
this.async);
-               } else {
-                       window = new SchedulerSiddhiQueue<StreamEvent>(this);
-               }
-               MemberFaultEventMap
-                               
.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", 
memberFaultEventMessageMap);
-
-               ExecutorService executorService = 
StratosThreadPool.getExecutorService(IDENTIFIER, 10);
-               cepTopologyEventReceiver.setExecutorService(executorService);
-               executorService.execute(cepTopologyEventReceiver);
-
-               //Ordinary scheduling
-               window.schedule();
-               if (log.isDebugEnabled()) {
-                       log.debug("Fault handling window processor initialized 
with [timeToKeep] " + timeToKeep +
-                                 ", [memberIdAttrName] " + memberIdAttrName + 
", [memberIdAttrIndex] " + memberIdAttrIndex +
-                                 ", [distributed-enabled] " + 
this.siddhiContext.isDistributedProcessingEnabled());
-               }
-       }
-
-       @Override
-       public void schedule() {
-               faultHandleScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
-       }
-
-       @Override
-       public void scheduleNow() {
-               faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
-       }
-
-       @Override
-       public void setScheduledExecutorService(ScheduledExecutorService 
scheduledExecutorService) {
-               this.faultHandleScheduler = scheduledExecutorService;
-       }
-
-       @Override
-       public void setThreadBarrier(ThreadBarrier threadBarrier) {
-               this.threadBarrier = threadBarrier;
-       }
-
-       @Override
-       public void destroy() {
-               // terminate topology listener thread
-               cepTopologyEventReceiver.terminate();
-               window = null;
-       }
-
-       public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
-               return memberTimeStampMap;
-       }
->>>>>>> ddf277b... Remove unnessary threads in messaging model
 }

Reply via email to